You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/04/04 18:09:39 UTC
[30/42] activemq-artemis git commit: ARTEMIS-463 Improvement to the
openwire testsuite https://issues.apache.org/jira/browse/ARTEMIS-463
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
index 97cd6f6..9729793 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
@@ -32,6 +32,7 @@ import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.region.SubscriptionStatistics;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
@@ -53,12 +54,13 @@ import org.slf4j.LoggerFactory;
/**
* @author gtully
* @see https://issues.apache.org/activemq/browse/AMQ-2020
- */
+ **/
public class QueueDuplicatesFromStoreTest extends TestCase {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(QueueDuplicatesFromStoreTest.class);
- private static final Logger LOG = LoggerFactory.getLogger(QueueDuplicatesFromStoreTest.class);
-
- ActiveMQQueue destination = new ActiveMQQueue("queue-" + QueueDuplicatesFromStoreTest.class.getSimpleName());
+ ActiveMQQueue destination = new ActiveMQQueue("queue-"
+ + QueueDuplicatesFromStoreTest.class.getSimpleName());
BrokerService brokerService;
final static String mesageIdRoot = "11111:22222:";
@@ -89,7 +91,7 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
}
public void testNoDuplicateAfterCacheFullAndAckedWithLargeAuditDepth() throws Exception {
- doTestNoDuplicateAfterCacheFullAndAcked(1024 * 10);
+ doTestNoDuplicateAfterCacheFullAndAcked(1024*10);
}
public void testNoDuplicateAfterCacheFullAndAckedWithSmallAuditDepth() throws Exception {
@@ -97,13 +99,15 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
}
public void doTestNoDuplicateAfterCacheFullAndAcked(final int auditDepth) throws Exception {
- final PersistenceAdapter persistenceAdapter = brokerService.getPersistenceAdapter();
- final MessageStore queueMessageStore = persistenceAdapter.createQueueMessageStore(destination);
+ final PersistenceAdapter persistenceAdapter = brokerService.getPersistenceAdapter();
+ final MessageStore queueMessageStore =
+ persistenceAdapter.createQueueMessageStore(destination);
final ConnectionContext contextNotInTx = new ConnectionContext();
final ConsumerInfo consumerInfo = new ConsumerInfo();
final DestinationStatistics destinationStatistics = new DestinationStatistics();
consumerInfo.setExclusive(true);
- final Queue queue = new Queue(brokerService, destination, queueMessageStore, destinationStatistics, brokerService.getTaskRunnerFactory());
+ final Queue queue = new Queue(brokerService, destination,
+ queueMessageStore, destinationStatistics, brokerService.getTaskRunnerFactory());
// a workaround for this issue
// queue.setUseCache(false);
@@ -134,34 +138,38 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
// pull from store in small windows
Subscription subscription = new Subscription() {
+ private SubscriptionStatistics subscriptionStatistics = new SubscriptionStatistics();
+
@Override
public void add(MessageReference node) throws Exception {
if (enqueueCounter.get() != node.getMessageId().getProducerSequenceId()) {
- errors.add("Not in sequence at: " + enqueueCounter.get() + ", received: " + node.getMessageId().getProducerSequenceId());
+ errors.add("Not in sequence at: " + enqueueCounter.get() + ", received: "
+ + node.getMessageId().getProducerSequenceId());
}
- assertEquals("is in order", enqueueCounter.get(), node.getMessageId().getProducerSequenceId());
+ assertEquals("is in order", enqueueCounter.get(), node
+ .getMessageId().getProducerSequenceId());
receivedLatch.countDown();
enqueueCounter.incrementAndGet();
node.decrementReferenceCount();
}
@Override
- public void add(ConnectionContext context, Destination destination) throws Exception {
+ public void add(ConnectionContext context, Destination destination)
+ throws Exception {
}
@Override
public int countBeforeFull() {
if (isFull()) {
return 0;
- }
- else {
+ } else {
return fullWindow - (int) (enqueueCounter.get() - ackedCount.get());
}
}
@Override
public void destroy() {
- }
+ };
@Override
public void gc() {
@@ -253,7 +261,8 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
}
@Override
- public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
+ public boolean matches(MessageReference node,
+ MessageEvaluationContext context) throws IOException {
return true;
}
@@ -263,11 +272,13 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
}
@Override
- public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
+ public void processMessageDispatchNotification(
+ MessageDispatchNotification mdn) throws Exception {
}
@Override
- public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
+ public Response pullMessage(ConnectionContext context,
+ MessagePull pull) throws Exception {
return null;
}
@@ -277,7 +288,8 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
}
@Override
- public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
+ public List<MessageReference> remove(ConnectionContext context,
+ Destination destination) throws Exception {
return null;
}
@@ -286,7 +298,9 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
}
@Override
- public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException {
+ public void setSelector(String selector)
+ throws InvalidSelectorException,
+ UnsupportedOperationException {
}
@Override
@@ -294,7 +308,8 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
}
@Override
- public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception {
+ public boolean addRecoveredMessage(ConnectionContext context,
+ MessageReference message) throws Exception {
return false;
}
@@ -304,16 +319,18 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
}
@Override
- public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
+ public void acknowledge(ConnectionContext context, MessageAck ack)
+ throws Exception {
}
@Override
- public int getCursorMemoryHighWaterMark() {
+ public int getCursorMemoryHighWaterMark(){
return 0;
}
@Override
- public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
+ public void setCursorMemoryHighWaterMark(
+ int cursorMemoryHighWaterMark) {
}
@Override
@@ -336,14 +353,24 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
}
@Override
- public void incrementConsumedCount() {
+ public void incrementConsumedCount(){
}
@Override
- public void resetConsumedCount() {
+ public void resetConsumedCount(){
}
+
+ @Override
+ public SubscriptionStatistics getSubscriptionStatistics() {
+ return subscriptionStatistics;
+ }
+
+ @Override
+ public long getInFlightMessageSize() {
+ return subscriptionStatistics.getInflightMessageSize().getTotalSize();
+ }
};
queue.addSubscription(contextNotInTx, subscription);
@@ -356,9 +383,12 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
for (int j = 0; j < ackBatchSize; j++, removeIndex++) {
ackedCount.incrementAndGet();
MessageAck ack = new MessageAck();
- ack.setLastMessageId(new MessageId(mesageIdRoot + removeIndex));
+ ack.setLastMessageId(new MessageId(mesageIdRoot
+ + removeIndex));
ack.setMessageCount(1);
- queue.removeMessage(contextNotInTx, subscription, new IndirectMessageReference(getMessage(removeIndex)), ack);
+ queue.removeMessage(contextNotInTx, subscription,
+ new IndirectMessageReference(
+ getMessage(removeIndex)), ack);
queue.wakeup();
}
@@ -373,7 +403,8 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
assertTrue("There are no errors: " + errors, errors.isEmpty());
assertEquals(count, enqueueCounter.get());
- assertEquals("store count is correct", count - removeIndex, queueMessageStore.getMessageCount());
+ assertEquals("store count is correct", count - removeIndex,
+ queueMessageStore.getMessageCount());
}
private Message getMessage(int i) throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
index b38a965..c9d0339 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
@@ -61,7 +61,6 @@ import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
-
import junit.framework.TestCase;
public class SubscriptionAddRemoveQueueTest extends TestCase {
@@ -177,16 +176,20 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
public class SimpleImmediateDispatchSubscription implements Subscription, LockOwner {
- List<MessageReference> dispatched = Collections.synchronizedList(new ArrayList<MessageReference>());
+ private SubscriptionStatistics subscriptionStatistics = new SubscriptionStatistics();
+ List<MessageReference> dispatched =
+ Collections.synchronizedList(new ArrayList<MessageReference>());
+
@Override
- public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
+ public void acknowledge(ConnectionContext context, MessageAck ack)
+ throws Exception {
}
@Override
public void add(MessageReference node) throws Exception {
// immediate dispatch
- QueueMessageReference qmr = (QueueMessageReference) node;
+ QueueMessageReference qmr = (QueueMessageReference)node;
qmr.lock(this);
dispatched.add(qmr);
}
@@ -400,5 +403,15 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
return 10;
}
+ @Override
+ public SubscriptionStatistics getSubscriptionStatistics() {
+ return subscriptionStatistics;
+ }
+
+ @Override
+ public long getInFlightMessageSize() {
+ return subscriptionStatistics.getInflightMessageSize().getTotalSize();
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
deleted file mode 100644
index ab388f0..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.region.cursors;
-
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.AutoFailTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.StorePendingQueueMessageStoragePolicy;
-import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.StoreUsage;
-import org.apache.activemq.usage.SystemUsage;
-import org.apache.activemq.usage.TempUsage;
-import org.apache.activemq.util.Wait;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Modified CursorSupport Unit test to reproduce the negative queue issue.
- *
- * Keys to reproducing:
- * 1) Consecutive queues with listener on first sending to second queue
- * 2) Push each queue to the memory limit
- * This seems to help reproduce the issue more consistently, but
- * we have seen times in our production environment where the
- * negative queue can occur without. Our memory limits are
- * very high in production and it still happens in varying
- * frequency.
- * 3) Prefetch
- * Lowering the prefetch down to 10 and below seems to help
- * reduce occurrences.
- * 4) # of consumers per queue
- * The issue occurs less with fewer consumers
- *
- * Things that do not affect reproduction:
- * 1) Spring - we use spring in our production applications, but this test case works
- * with or without it.
- * 2) transacted
- */
-public class NegativeQueueTest extends AutoFailTestSupport {
-
- private static final Logger LOG = LoggerFactory.getLogger(NegativeQueueTest.class);
-
- public static SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd,hh:mm:ss:SSS");
-
- private static final String QUEUE_1_NAME = "conn.test.queue.1";
- private static final String QUEUE_2_NAME = "conn.test.queue.2";
-
- private static final long QUEUE_MEMORY_LIMIT = 2097152;
- private static final long MEMORY_USAGE = 400000000;
- private static final long TEMP_USAGE = 200000000;
- private static final long STORE_USAGE = 1000000000;
- // ensure we exceed the cache 70%
- private static final int MESSAGE_COUNT = 2100;
-
- protected static final boolean TRANSACTED = true;
- protected static final boolean DEBUG = true;
- protected static int NUM_CONSUMERS = 20;
- protected static int PREFETCH_SIZE = 1000;
-
- protected BrokerService broker;
- protected String bindAddress = "tcp://localhost:0";
-
- public void testWithDefaultPrefetch() throws Exception {
- PREFETCH_SIZE = 1000;
- NUM_CONSUMERS = 20;
- blastAndConsume();
- }
-
- public void x_testWithDefaultPrefetchFiveConsumers() throws Exception {
- PREFETCH_SIZE = 1000;
- NUM_CONSUMERS = 5;
- blastAndConsume();
- }
-
- public void x_testWithDefaultPrefetchTwoConsumers() throws Exception {
- PREFETCH_SIZE = 1000;
- NUM_CONSUMERS = 2;
- blastAndConsume();
- }
-
- public void testWithDefaultPrefetchOneConsumer() throws Exception {
- PREFETCH_SIZE = 1000;
- NUM_CONSUMERS = 1;
- blastAndConsume();
- }
-
- public void testWithMediumPrefetch() throws Exception {
- PREFETCH_SIZE = 50;
- NUM_CONSUMERS = 20;
- blastAndConsume();
- }
-
- public void x_testWithSmallPrefetch() throws Exception {
- PREFETCH_SIZE = 10;
- NUM_CONSUMERS = 20;
- blastAndConsume();
- }
-
- public void testWithNoPrefetch() throws Exception {
- PREFETCH_SIZE = 1;
- NUM_CONSUMERS = 20;
- blastAndConsume();
- }
-
- public void blastAndConsume() throws Exception {
- LOG.info(getName());
- ConnectionFactory factory = createConnectionFactory();
-
- //get proxy queues for statistics lookups
- Connection proxyConnection = factory.createConnection();
- proxyConnection.start();
- Session proxySession = proxyConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final QueueViewMBean proxyQueue1 = getProxyToQueueViewMBean(proxySession.createQueue(QUEUE_1_NAME));
- final QueueViewMBean proxyQueue2 = getProxyToQueueViewMBean(proxySession.createQueue(QUEUE_2_NAME));
-
- // LOAD THE QUEUE
- Connection producerConnection = factory.createConnection();
- producerConnection.start();
- Session session = producerConnection.createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE);
- Destination queue = session.createQueue(QUEUE_1_NAME);
- MessageProducer producer = session.createProducer(queue);
- List<TextMessage> senderList = new ArrayList<>();
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- TextMessage msg = session.createTextMessage(i + " " + formatter.format(new Date()));
- senderList.add(msg);
- producer.send(msg);
- if (TRANSACTED)
- session.commit();
- if (DEBUG && i % 100 == 0) {
- int index = (i / 100) + 1;
- System.out.print(index - ((index / 10) * 10));
- }
- }
-
- //get access to the Queue info
- if (DEBUG) {
- System.out.println("");
- System.out.println("Queue1 Size = " + proxyQueue1.getQueueSize());
- System.out.println("Queue1 Memory % Used = " + proxyQueue1.getMemoryPercentUsage());
- System.out.println("Queue1 Memory Available = " + proxyQueue1.getMemoryLimit());
- }
-
- // FLUSH THE QUEUE
- final CountDownLatch latch1 = new CountDownLatch(1);
- final CountDownLatch latch2 = new CountDownLatch(1);
- Connection[] consumerConnections1 = new Connection[NUM_CONSUMERS];
- List<Message> consumerList1 = new ArrayList<>();
- Connection[] consumerConnections2 = new Connection[NUM_CONSUMERS];
- Connection[] producerConnections2 = new Connection[NUM_CONSUMERS];
- List<Message> consumerList2 = new ArrayList<>();
-
- for (int ix = 0; ix < NUM_CONSUMERS; ix++) {
- producerConnections2[ix] = factory.createConnection();
- producerConnections2[ix].start();
- consumerConnections1[ix] = getConsumerConnection(factory);
- Session consumerSession = consumerConnections1[ix].createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = consumerSession.createConsumer(session.createQueue(QUEUE_1_NAME));
- consumer.setMessageListener(new SessionAwareMessageListener(producerConnections2[ix], consumerSession, QUEUE_2_NAME, latch1, consumerList1));
- }
-
- latch1.await(200000, TimeUnit.MILLISECONDS);
- if (DEBUG) {
- System.out.println("");
- System.out.println("Queue2 Size = " + proxyQueue2.getQueueSize());
- System.out.println("Queue2 Memory % Used = " + proxyQueue2.getMemoryPercentUsage());
- System.out.println("Queue2 Memory Available = " + proxyQueue2.getMemoryLimit());
- }
-
- for (int ix = 0; ix < NUM_CONSUMERS; ix++) {
- consumerConnections2[ix] = getConsumerConnection(factory);
- Session consumerSession = consumerConnections2[ix].createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = consumerSession.createConsumer(session.createQueue(QUEUE_2_NAME));
- consumer.setMessageListener(new SessionAwareMessageListener(consumerSession, latch2, consumerList2));
- }
-
- boolean success = Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- boolean done = latch2.await(10, TimeUnit.SECONDS);
- if (DEBUG) {
- System.out.println("");
- System.out.println("Queue1 Size = " + proxyQueue1.getQueueSize());
- System.out.println("Queue1 Memory % Used = " + proxyQueue1.getMemoryPercentUsage());
- System.out.println("Queue2 Size = " + proxyQueue2.getQueueSize());
- System.out.println("Queue2 Memory % Used = " + proxyQueue2.getMemoryPercentUsage());
- System.out.println("Queue2 Memory Available = " + proxyQueue2.getMemoryLimit());
- }
- return done;
- }
- }, 300 * 1000);
- if (!success) {
- dumpAllThreads("blocked waiting on 2");
- }
- assertTrue("got all expected messages on 2", success);
-
- producerConnection.close();
- for (int ix = 0; ix < NUM_CONSUMERS; ix++) {
- consumerConnections1[ix].close();
- consumerConnections2[ix].close();
- producerConnections2[ix].close();
- }
-
- //let the consumer statistics on queue2 have time to update
- Thread.sleep(500);
-
- if (DEBUG) {
- System.out.println("");
- System.out.println("Queue1 Size = " + proxyQueue1.getQueueSize());
- System.out.println("Queue1 Memory % Used = " + proxyQueue1.getMemoryPercentUsage());
- System.out.println("Queue2 Size = " + proxyQueue2.getQueueSize());
- System.out.println("Queue2 Memory % Used = " + proxyQueue2.getMemoryPercentUsage());
- }
-
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return 0 == proxyQueue1.getQueueSize();
- }
- });
- assertEquals("Queue1 has gone negative,", 0, proxyQueue1.getQueueSize());
-
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return 0 == proxyQueue2.getQueueSize();
- }
- });
- assertEquals("Queue2 has gone negative,", 0, proxyQueue2.getQueueSize());
- proxyConnection.close();
-
- }
-
- private QueueViewMBean getProxyToQueueViewMBean(Queue queue) throws MalformedObjectNameException, JMSException {
- final String prefix = "org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=";
-
- ObjectName queueViewMBeanName = new ObjectName(prefix + queue.getQueueName());
- QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
-
- return proxy;
- }
-
- protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException {
- Connection connection = fac.createConnection();
- connection.start();
- return connection;
- }
-
- @Override
- protected void setUp() throws Exception {
- if (broker == null) {
- broker = createBroker();
- }
- super.setUp();
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- }
- }
-
- protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(bindAddress);
- Properties props = new Properties();
- props.setProperty("prefetchPolicy.durableTopicPrefetch", "" + PREFETCH_SIZE);
- props.setProperty("prefetchPolicy.optimizeDurableTopicPrefetch", "" + PREFETCH_SIZE);
- props.setProperty("prefetchPolicy.queuePrefetch", "" + PREFETCH_SIZE);
- cf.setProperties(props);
- return cf;
- }
-
- protected BrokerService createBroker() throws Exception {
- BrokerService answer = new BrokerService();
- configureBroker(answer);
- answer.start();
- answer.waitUntilStarted();
- bindAddress = answer.getTransportConnectors().get(0).getConnectUri().toString();
- return answer;
- }
-
- protected void configureBroker(BrokerService answer) throws Exception {
- PolicyEntry policy = new PolicyEntry();
- policy.setMemoryLimit(QUEUE_MEMORY_LIMIT);
- policy.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy());
-
- // disable the cache to be sure setBatch is the problem
- // will get lots of duplicates
- // real problem is sync between cursor and store add - leads to out or order messages
- // in the cursor so setBatch can break.
- // policy.setUseCache(false);
-
- PolicyMap pMap = new PolicyMap();
- pMap.setDefaultEntry(policy);
- answer.setDestinationPolicy(pMap);
- answer.setDeleteAllMessagesOnStartup(true);
- answer.addConnector("tcp://localhost:0");
-
- MemoryUsage memoryUsage = new MemoryUsage();
- memoryUsage.setLimit(MEMORY_USAGE);
- memoryUsage.setPercentUsageMinDelta(20);
-
- TempUsage tempUsage = new TempUsage();
- tempUsage.setLimit(TEMP_USAGE);
-
- StoreUsage storeUsage = new StoreUsage();
- storeUsage.setLimit(STORE_USAGE);
-
- SystemUsage systemUsage = new SystemUsage();
- systemUsage.setMemoryUsage(memoryUsage);
- systemUsage.setTempUsage(tempUsage);
- systemUsage.setStoreUsage(storeUsage);
- answer.setSystemUsage(systemUsage);
- }
-
- /**
- * Message listener that is given the Session for transacted consumers
- */
- class SessionAwareMessageListener implements MessageListener {
-
- private final List<Message> consumerList;
- private final CountDownLatch latch;
- private final Session consumerSession;
- private Session producerSession;
- private MessageProducer producer;
-
- public SessionAwareMessageListener(Session consumerSession, CountDownLatch latch, List<Message> consumerList) {
- this(null, consumerSession, null, latch, consumerList);
- }
-
- public SessionAwareMessageListener(Connection producerConnection,
- Session consumerSession,
- String outQueueName,
- CountDownLatch latch,
- List<Message> consumerList) {
- this.consumerList = consumerList;
- this.latch = latch;
- this.consumerSession = consumerSession;
-
- if (producerConnection != null) {
- try {
- producerSession = producerConnection.createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE);
- Destination queue = producerSession.createQueue(outQueueName);
- producer = producerSession.createProducer(queue);
- }
- catch (JMSException e) {
- e.printStackTrace();
- }
- }
- }
-
- @Override
- public void onMessage(Message msg) {
- try {
- if (producer == null) {
- // sleep to act as a slow consumer
- // which will force a mix of direct and polled dispatching
- // using the cursor on the broker
- Thread.sleep(50);
- }
- else {
- producer.send(msg);
- if (TRANSACTED)
- producerSession.commit();
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- }
-
- synchronized (consumerList) {
- consumerList.add(msg);
- if (DEBUG && consumerList.size() % 100 == 0) {
- int index = consumerList.size() / 100;
- System.out.print(index - ((index / 10) * 10));
- }
- if (consumerList.size() == MESSAGE_COUNT) {
- latch.countDown();
- }
- }
- if (TRANSACTED) {
- try {
- consumerSession.commit();
- }
- catch (JMSException e) {
- e.printStackTrace();
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java
deleted file mode 100644
index 2d8adb5..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import java.net.URI;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.spring.ConsumerBean;
-import org.apache.activemq.xbean.XBeanBrokerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- *
- */
-public class CompositeQueueTest extends EmbeddedBrokerTestSupport {
-
- private static final Logger LOG = LoggerFactory.getLogger(CompositeQueueTest.class);
-
- protected int total = 10;
- protected Connection connection;
- public String messageSelector1, messageSelector2 = null;
-
- public void testVirtualTopicCreation() throws Exception {
- if (connection == null) {
- connection = createConnection();
- }
- connection.start();
-
- ConsumerBean messageList1 = new ConsumerBean();
- ConsumerBean messageList2 = new ConsumerBean();
- messageList1.setVerbose(true);
- messageList2.setVerbose(true);
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Destination producerDestination = getProducerDestination();
- Destination destination1 = getConsumer1Dsetination();
- Destination destination2 = getConsumer2Dsetination();
-
- LOG.info("Sending to: " + producerDestination);
- LOG.info("Consuming from: " + destination1 + " and " + destination2);
-
- MessageConsumer c1 = session.createConsumer(destination1, messageSelector1);
- MessageConsumer c2 = session.createConsumer(destination2, messageSelector2);
-
- c1.setMessageListener(messageList1);
- c2.setMessageListener(messageList2);
-
- // create topic producer
- MessageProducer producer = session.createProducer(producerDestination);
- assertNotNull(producer);
-
- for (int i = 0; i < total; i++) {
- producer.send(createMessage(session, i));
- }
-
- assertMessagesArrived(messageList1, messageList2);
- }
-
- protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) {
- messageList1.assertMessagesArrived(total);
- messageList2.assertMessagesArrived(total);
- }
-
- protected TextMessage createMessage(Session session, int i) throws JMSException {
- TextMessage textMessage = session.createTextMessage("message: " + i);
- if (i % 2 != 0) {
- textMessage.setStringProperty("odd", "yes");
- }
- else {
- textMessage.setStringProperty("odd", "no");
- }
- textMessage.setIntProperty("i", i);
- return textMessage;
- }
-
- protected Destination getConsumer1Dsetination() {
- return new ActiveMQQueue("FOO");
- }
-
- protected Destination getConsumer2Dsetination() {
- return new ActiveMQTopic("BAR");
- }
-
- protected Destination getProducerDestination() {
- return new ActiveMQQueue("MY.QUEUE");
- }
-
- @Override
- protected void tearDown() throws Exception {
- if (connection != null) {
- connection.close();
- }
- super.tearDown();
- }
-
- @Override
- protected BrokerService createBroker() throws Exception {
- XBeanBrokerFactory factory = new XBeanBrokerFactory();
- BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri()));
- return answer;
- }
-
- protected String getBrokerConfigUri() {
- return "org/apache/activemq/broker/virtual/composite-queue.xml";
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeTopicTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeTopicTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeTopicTest.java
deleted file mode 100644
index 9ada103..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeTopicTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import javax.jms.Destination;
-
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-
-/**
- *
- *
- */
-public class CompositeTopicTest extends CompositeQueueTest {
-
- @Override
- protected Destination getConsumer1Dsetination() {
- return new ActiveMQQueue("FOO");
- }
-
- @Override
- protected Destination getConsumer2Dsetination() {
- return new ActiveMQTopic("BAR");
- }
-
- @Override
- protected Destination getProducerDestination() {
- return new ActiveMQTopic("MY.TOPIC");
- }
-
- @Override
- protected String getBrokerConfigUri() {
- return "org/apache/activemq/broker/virtual/composite-topic.xml";
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java
deleted file mode 100644
index 39e9d2a..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import java.io.IOException;
-import java.net.URI;
-
-import javax.jms.Connection;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanServerConnection;
-import javax.management.ObjectInstance;
-import javax.management.ObjectName;
-import javax.management.remote.JMXConnector;
-import javax.management.remote.JMXConnectorFactory;
-import javax.management.remote.JMXServiceURL;
-
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-
-import org.apache.activemq.broker.Broker;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.ProducerBrokerExchange;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.DestinationFilter;
-import org.apache.activemq.broker.region.DestinationInterceptor;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.xbean.XBeanBrokerFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test for AMQ-4571.
- * checks that durable subscription is fully unregistered
- * when using nested destination interceptors.
- */
-public class DestinationInterceptorDurableSubTest extends EmbeddedBrokerTestSupport {
-
- private static final transient Logger LOG = LoggerFactory.getLogger(DestinationInterceptorDurableSubTest.class);
- private MBeanServerConnection mbsc = null;
- public static final String JMX_CONTEXT_BASE_NAME = "org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName=";
-
- /**
- * Tests AMQ-4571.
- *
- * @throws Exception
- */
- public void testVirtualTopicRemoval() throws Exception {
-
- LOG.debug("Running testVirtualTopicRemoval()");
- String clientId1 = "myId1";
- String clientId2 = "myId2";
-
- Connection conn = null;
- Session session = null;
-
- try {
- assertTrue(broker.isStarted());
-
- // create durable sub 1
- conn = createConnection();
- conn.setClientID(clientId1);
- conn.start();
- session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- // Topic topic = session.createTopic(destination.getPhysicalName());
- TopicSubscriber sub1 = session.createDurableSubscriber((Topic) destination, clientId1);
-
- // create durable sub 2
- TopicSubscriber sub2 = session.createDurableSubscriber((Topic) destination, clientId2);
-
- // verify two subs registered in JMX
- assertSubscriptionCount(destination.getPhysicalName(), 2);
- assertTrue(isSubRegisteredInJmx(destination.getPhysicalName(), clientId1));
- assertTrue(isSubRegisteredInJmx(destination.getPhysicalName(), clientId2));
-
- // delete sub 1
- sub1.close();
- session.unsubscribe(clientId1);
-
- // verify only one sub registered in JMX
- assertSubscriptionCount(destination.getPhysicalName(), 1);
- assertFalse(isSubRegisteredInJmx(destination.getPhysicalName(), clientId1));
- assertTrue(isSubRegisteredInJmx(destination.getPhysicalName(), clientId2));
-
- // delete sub 2
- sub2.close();
- session.unsubscribe(clientId2);
-
- // verify no sub registered in JMX
- assertSubscriptionCount(destination.getPhysicalName(), 0);
- assertFalse(isSubRegisteredInJmx(destination.getPhysicalName(), clientId1));
- assertFalse(isSubRegisteredInJmx(destination.getPhysicalName(), clientId2));
- }
- finally {
- session.close();
- conn.close();
- }
- }
-
- /**
- * Connects to broker using JMX
- *
- * @return The JMX connection
- * @throws IOException in case of any errors
- */
- protected MBeanServerConnection connectJMXBroker() throws IOException {
- // connect to broker via JMX
- JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://:1299/jmxrmi");
- JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
- MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
- LOG.debug("JMX connection established");
- return mbsc;
- }
-
- /**
- * Asserts that the Subscriptions JMX attribute of a topic has the expected
- * count.
- *
- * @param topicName name of the topic destination
- * @param expectedCount expected number of subscriptions
- * @return
- */
- protected boolean assertSubscriptionCount(String topicName, int expectedCount) {
- try {
- if (mbsc == null) {
- mbsc = connectJMXBroker();
- }
- // query broker queue size
- ObjectName[] tmp = (ObjectName[]) mbsc.getAttribute(new ObjectName(JMX_CONTEXT_BASE_NAME + topicName), "Subscriptions");
- assertEquals(expectedCount, tmp.length);
- }
- catch (Exception ex) {
- LOG.error(ex.getMessage());
- return false;
- }
- return true;
- }
-
- /**
- * Checks if a subscriptions for topic topicName with subName is registered in JMX
- *
- * @param topicName physical name of topic destination (excluding prefix 'topic://')
- * @param subName name of the durable subscription
- * @return true if registered, false otherwise
- */
- protected boolean isSubRegisteredInJmx(String topicName, String subName) {
-
- try {
- if (mbsc == null) {
- mbsc = connectJMXBroker();
- }
-
- // A durable sub is registered under the Subscriptions JMX attribute of the topic and
- // as its own ObjectInstance under the topic's Consumer namespace.
- // AMQ-4571 only removed the latter not the former on unsubscribe(), so we need
- // to check against both.
- ObjectName[] names = (ObjectName[]) mbsc.getAttribute(new ObjectName(JMX_CONTEXT_BASE_NAME + topicName), "Subscriptions");
- ObjectInstance instance = mbsc.getObjectInstance(new ObjectName(JMX_CONTEXT_BASE_NAME +
- topicName +
- ",endpoint=Consumer,clientId=myId1,consumerId=Durable(myId1_" +
- subName +
- ")"));
-
- if (instance == null)
- return false;
-
- for (int i = 0; i < names.length; i++) {
- if (names[i].toString().contains(subName))
- return true;
- }
- }
- catch (InstanceNotFoundException ine) {
- //this may be expected so log at info level
- LOG.info(ine.toString());
- return false;
- }
- catch (Exception ex) {
- LOG.error(ex.toString());
- return false;
- }
- return false;
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- }
-
- @Override
- protected BrokerService createBroker() throws Exception {
- XBeanBrokerFactory factory = new XBeanBrokerFactory();
- BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri()));
-
- // lets disable persistence as we are a test
- answer.setPersistent(false);
- useTopic = true;
- return answer;
- }
-
- protected String getBrokerConfigUri() {
- return "org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml";
- }
-
- /**
- * Simple but custom topic interceptor.
- * To be used for testing nested interceptors in conjunction with
- * virtual topic interceptor.
- */
- public static class SimpleDestinationInterceptor implements DestinationInterceptor {
-
- private final Logger LOG = LoggerFactory.getLogger(SimpleDestinationInterceptor.class);
- private BrokerService broker;
-
- public SimpleDestinationInterceptor() {
- }
-
- /* (non-Javadoc)
- * @see org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService)
- */
- public void setBrokerService(BrokerService brokerService) {
- LOG.info("setBrokerService()");
- this.broker = brokerService;
- }
-
- /* (non-Javadoc)
- * @see org.apache.activemq.broker.region.DestinationInterceptor#intercept(org.apache.activemq.broker.region.Destination)
- */
- @Override
- public Destination intercept(final Destination destination) {
- LOG.info("intercept({})", destination.getName());
-
- if (!destination.getActiveMQDestination().getPhysicalName().startsWith("ActiveMQ")) {
- return new DestinationFilter(destination) {
- @Override
- public void send(ProducerBrokerExchange context, Message message) throws Exception {
- // Send message to Destination
- if (LOG.isDebugEnabled()) {
- LOG.debug("SimpleDestinationInterceptor: Sending message to destination:" + this.getActiveMQDestination().getPhysicalName());
- }
- // message.setDestination(destination.getActiveMQDestination());
- super.send(context, message);
- }
- };
- }
- return destination;
- }
-
- /* (non-Javadoc)
- * @see org.apache.activemq.broker.region.DestinationInterceptor#remove(org.apache.activemq.broker.region.Destination)
- */
- @Override
- public void remove(Destination destination) {
- LOG.info("remove({})", destination.getName());
- this.broker = null;
- }
-
- /* (non-Javadoc)
- * @see org.apache.activemq.broker.region.DestinationInterceptor#create(org.apache.activemq.broker.Broker, org.apache.activemq.broker.ConnectionContext, org.apache.activemq.command.ActiveMQDestination)
- */
- @Override
- public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception {
- LOG.info("create(" + broker.getBrokerName() + ", " + context.toString() + ", " + destination.getPhysicalName());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java
deleted file mode 100644
index e91ae4b..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import org.apache.activemq.spring.ConsumerBean;
-
-/**
- *
- */
-public class FilteredQueueTest extends CompositeQueueTest {
-
- @Override
- protected String getBrokerConfigUri() {
- return "org/apache/activemq/broker/virtual/filtered-queue.xml";
- }
-
- @Override
- protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) {
- messageList1.assertMessagesArrived(total / 2);
- messageList2.assertMessagesArrived(1);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java
deleted file mode 100644
index 5ca00b7..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.DestinationInterceptor;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.virtual.MirroredQueue;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.StoreUsage;
-import org.apache.activemq.usage.SystemUsage;
-import org.apache.activemq.usage.TempUsage;
-import org.apache.activemq.util.IOHelper;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.util.Assert;
-
-/**
- * This test will determine that the producer flow control does not kick in.
- * The original MirroredQueue implementation was causing the queue to update
- * the topic memory usage instead of the queue memory usage.
- * The reason is that the message memory usage instance will not be updated
- * unless it is null. This was the case when the message was initially sent
- * to the topic but then it was non-null when it was being sent to the queue.
- * When the region destination was set, the associated memory usage was not
- * updated to the passed queue destination and thus the memory usage of the
- * topic was being updated instead.
- *
- * @author Claudio Corsi
- */
-public class MirroredQueueCorrectMemoryUsageTest extends EmbeddedBrokerTestSupport {
-
- private static final Logger logger = LoggerFactory.getLogger(MirroredQueueCorrectMemoryUsageTest.class);
-
- private static final long ONE_MB = 0x0100000;
- private static final long TEN_MB = ONE_MB * 10;
- private static final long TWENTY_MB = TEN_MB * 2;
-
- private static final String CREATED_STATIC_FOR_PERSISTENT = "created.static.for.persistent";
-
- @Override
- protected boolean isPersistent() {
- return true;
- }
-
- @Override
- protected BrokerService createBroker() throws Exception {
- // Create the broker service instance....
- BrokerService broker = super.createBroker();
- // Create and add the mirrored queue destination interceptor ....
- DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[1];
- MirroredQueue mq = new MirroredQueue();
- mq.setCopyMessage(true);
- mq.setPrefix("");
- mq.setPostfix(".qmirror");
- destinationInterceptors[0] = mq;
- broker.setDestinationInterceptors(destinationInterceptors);
- // Create the destination policy for the topics and queues
- PolicyMap policyMap = new PolicyMap();
- List<PolicyEntry> entries = new LinkedList<>();
- // Create Topic policy entry
- PolicyEntry policyEntry = new PolicyEntry();
- super.useTopic = true;
- ActiveMQDestination destination = super.createDestination(">");
- Assert.isTrue(destination.isTopic(), "Created destination was not a topic");
- policyEntry.setDestination(destination);
- policyEntry.setProducerFlowControl(true);
- policyEntry.setMemoryLimit(ONE_MB); // x10
- entries.add(policyEntry);
- // Create Queue policy entry
- policyEntry = new PolicyEntry();
- super.useTopic = false;
- destination = super.createDestination(CREATED_STATIC_FOR_PERSISTENT);
- Assert.isTrue(destination.isQueue(), "Created destination was not a queue");
- policyEntry.setDestination(destination);
- policyEntry.setProducerFlowControl(true);
- policyEntry.setMemoryLimit(TEN_MB);
- entries.add(policyEntry);
- policyMap.setPolicyEntries(entries);
- broker.setDestinationPolicy(policyMap);
- // Set destinations
- broker.setDestinations(new ActiveMQDestination[]{destination});
- // Set system usage
- SystemUsage memoryManager = new SystemUsage();
- MemoryUsage memoryUsage = new MemoryUsage();
- memoryUsage.setLimit(TEN_MB);
- memoryManager.setMemoryUsage(memoryUsage);
- StoreUsage storeUsage = new StoreUsage();
- storeUsage.setLimit(TWENTY_MB);
- memoryManager.setStoreUsage(storeUsage);
- TempUsage tempDiskUsage = new TempUsage();
- tempDiskUsage.setLimit(TEN_MB);
- memoryManager.setTempUsage(tempDiskUsage);
- broker.setSystemUsage(memoryManager);
- // Set the persistent adapter
- KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
- persistenceAdapter.setJournalMaxFileLength((int) TEN_MB);
- // Delete all current messages...
- IOHelper.deleteFile(persistenceAdapter.getDirectory());
- broker.setPersistenceAdapter(persistenceAdapter);
- return broker;
- }
-
- @Override
- @Before
- protected void setUp() throws Exception {
- super.setUp();
- }
-
- @Override
- @After
- protected void tearDown() throws Exception {
- super.tearDown();
- }
-
- @Test(timeout = 40000)
- public void testNoMemoryUsageIncreaseForTopic() throws Exception {
- Connection connection = super.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Destination destination = session.createQueue(CREATED_STATIC_FOR_PERSISTENT);
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- char[] m = new char[1024];
- Arrays.fill(m, 'x');
- // create some messages that have 1k each
- for (int i = 1; i < 12000; i++) {
- producer.send(session.createTextMessage(new String(m)));
- logger.debug("Sent message: " + i);
- }
- producer.close();
- session.close();
- connection.stop();
- connection.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java
deleted file mode 100644
index 127f04c..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TemporaryQueue;
-
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.spring.ConsumerBean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- */
-public class MirroredQueueTest extends EmbeddedBrokerTestSupport {
-
- private static final transient Logger LOG = LoggerFactory.getLogger(MirroredQueueTest.class);
- private Connection connection;
-
- public void testSendingToQueueIsMirrored() throws Exception {
- if (connection == null) {
- connection = createConnection();
- }
- connection.start();
-
- ConsumerBean messageList = new ConsumerBean();
- messageList.setVerbose(true);
-
- Destination consumeDestination = createConsumeDestination();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- LOG.info("Consuming from: " + consumeDestination);
-
- MessageConsumer c1 = session.createConsumer(consumeDestination);
- c1.setMessageListener(messageList);
-
- // create topic producer
- ActiveMQQueue sendDestination = new ActiveMQQueue(getQueueName());
- LOG.info("Sending to: " + sendDestination);
-
- MessageProducer producer = session.createProducer(sendDestination);
- assertNotNull(producer);
-
- int total = 10;
- for (int i = 0; i < total; i++) {
- producer.send(session.createTextMessage("message: " + i));
- }
-
- ///Thread.sleep(1000000);
-
- messageList.assertMessagesArrived(total);
-
- LOG.info("Received: " + messageList);
- }
-
- public void testTempMirroredQueuesClearDown() throws Exception {
- if (connection == null) {
- connection = createConnection();
- }
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- TemporaryQueue tempQueue = session.createTemporaryQueue();
- RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(RegionBroker.class);
- assertTrue(rb.getDestinationMap().size() == 5);
- tempQueue.delete();
- assertTrue(rb.getDestinationMap().size() == 4);
- }
-
- protected Destination createConsumeDestination() {
- return new ActiveMQTopic("VirtualTopic.Mirror." + getQueueName());
- }
-
- protected String getQueueName() {
- return "My.Queue";
- }
-
- @Override
- protected BrokerService createBroker() throws Exception {
- BrokerService answer = new BrokerService();
- answer.setUseMirroredQueues(true);
- answer.setPersistent(isPersistent());
- answer.addConnector(bindAddress);
- return answer;
- }
-
- @Override
- protected void tearDown() throws Exception {
- if (connection != null) {
- connection.close();
- }
- super.tearDown();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueUsingVirtualTopicQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueUsingVirtualTopicQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueUsingVirtualTopicQueueTest.java
deleted file mode 100644
index 6acaad1..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueUsingVirtualTopicQueueTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import javax.jms.Destination;
-
-import org.apache.activemq.command.ActiveMQQueue;
-
-/**
- *
- *
- */
-public class MirroredQueueUsingVirtualTopicQueueTest extends MirroredQueueTest {
-
- @Override
- protected Destination createConsumeDestination() {
- String queueName = "Consumer.A.VirtualTopic.Mirror." + getQueueName();
- return new ActiveMQQueue(queueName);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java
deleted file mode 100644
index 85e14c7..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.apache.activemq.broker.region.DestinationInterceptor;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.virtual.CompositeTopic;
-import org.apache.activemq.broker.region.virtual.VirtualDestination;
-import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
-import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.util.ByteSequence;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class VirtualDestPerfTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(VirtualDestPerfTest.class);
- public int messageSize = 5 * 1024;
- public int messageCount = 10000;
- ActiveMQTopic target = new ActiveMQTopic("target");
- BrokerService brokerService;
- ActiveMQConnectionFactory connectionFactory;
-
- @Test
- @Ignore("comparison test - 'new' no wait on future with async send broker side is always on")
- public void testAsyncSendBurstToFillCache() throws Exception {
- startBroker(4, true, true);
- connectionFactory.setUseAsyncSend(true);
-
- // a burst of messages to fill the cache
- messageCount = 22000;
- messageSize = 10 * 1024;
-
- LinkedHashMap<Integer, Long> results = new LinkedHashMap<>();
-
- final ActiveMQQueue queue = new ActiveMQQueue("targetQ");
- for (Integer numThreads : new Integer[]{1, 2}) {
- ExecutorService executor = Executors.newFixedThreadPool(numThreads);
- final AtomicLong numMessagesToSend = new AtomicLong(messageCount);
- purge();
- long startTime = System.currentTimeMillis();
- for (int i = 0; i < numThreads; i++) {
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- produceMessages(numMessagesToSend, queue);
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- executor.shutdown();
- executor.awaitTermination(5, TimeUnit.MINUTES);
- long endTime = System.currentTimeMillis();
- long seconds = (endTime - startTime) / 1000;
- LOG.info("For numThreads {} duration {}", numThreads.intValue(), seconds);
- results.put(numThreads, seconds);
- LOG.info("Broker got {} messages", brokerService.getAdminView().getTotalEnqueueCount());
- }
-
- brokerService.stop();
- brokerService.waitUntilStopped();
- LOG.info("Results: {}", results);
- }
-
- private void purge() throws Exception {
- ObjectName[] queues = brokerService.getAdminView().getQueues();
- if (queues.length == 1) {
- QueueViewMBean queueViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(queues[0], QueueViewMBean.class, false);
- queueViewMBean.purge();
- }
- }
-
- @Test
- @Ignore("comparison test - takes too long and really needs a peek at the graph")
- public void testPerf() throws Exception {
- LinkedHashMap<Integer, Long> resultsT = new LinkedHashMap<>();
- LinkedHashMap<Integer, Long> resultsF = new LinkedHashMap<>();
-
- for (int i = 2; i < 11; i++) {
- for (Boolean concurrent : new Boolean[]{true, false}) {
- startBroker(i, concurrent, false);
-
- long startTime = System.currentTimeMillis();
- produceMessages(new AtomicLong(messageCount), target);
- long endTime = System.currentTimeMillis();
- long seconds = (endTime - startTime) / 1000;
- LOG.info("For routes {} duration {}", i, seconds);
- if (concurrent) {
- resultsT.put(i, seconds);
- }
- else {
- resultsF.put(i, seconds);
- }
- brokerService.stop();
- brokerService.waitUntilStopped();
- }
- }
- LOG.info("results T{} F{}", resultsT, resultsF);
- LOG.info("http://www.chartgo.com/samples.do?chart=line&border=1&show3d=0&width=600&height=500&roundedge=1&transparency=1&legend=1&title=Send:10k::Concurrent-v-Serial&xtitle=routes&ytitle=Duration(seconds)&chrtbkgndcolor=white&threshold=0.0&lang=en" + "&xaxis1=" + toStr(resultsT.keySet()) + "&yaxis1=" + toStr(resultsT.values()) + "&group1=concurrent" + "&xaxis2=" + toStr(resultsF.keySet()) + "&yaxis2=" + toStr(resultsF.values()) + "&group2=serial" + "&from=linejsp");
- }
-
- private String toStr(Collection set) {
- return set.toString().replace(",", "%0D%0A").replace("[", "").replace("]", "").replace(" ", "");
- }
-
- protected void produceMessages(AtomicLong messageCount, ActiveMQDestination destination) throws Exception {
- final ByteSequence payLoad = new ByteSequence(new byte[messageSize]);
- Connection connection = connectionFactory.createConnection();
- MessageProducer messageProducer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createProducer(destination);
- messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
- ActiveMQBytesMessage message = new ActiveMQBytesMessage();
- message.setContent(payLoad);
- while (messageCount.decrementAndGet() >= 0) {
- messageProducer.send(message);
- }
- connection.close();
- }
-
- private void startBroker(int fanoutCount,
- boolean concurrentSend,
- boolean concurrentStoreAndDispatchQueues) throws Exception {
- brokerService = new BrokerService();
- brokerService.setDeleteAllMessagesOnStartup(true);
- brokerService.setUseVirtualTopics(true);
- brokerService.addConnector("tcp://0.0.0.0:0");
- brokerService.setAdvisorySupport(false);
- PolicyMap destPolicyMap = new PolicyMap();
- PolicyEntry defaultEntry = new PolicyEntry();
- defaultEntry.setExpireMessagesPeriod(0);
- defaultEntry.setOptimizedDispatch(true);
- defaultEntry.setCursorMemoryHighWaterMark(110);
- destPolicyMap.setDefaultEntry(defaultEntry);
- brokerService.setDestinationPolicy(destPolicyMap);
-
- CompositeTopic route = new CompositeTopic();
- route.setName("target");
- route.setForwardOnly(true);
- route.setConcurrentSend(concurrentSend);
- Collection<ActiveMQQueue> routes = new ArrayList<>();
- for (int i = 0; i < fanoutCount; i++) {
- routes.add(new ActiveMQQueue("route." + i));
- }
- route.setForwardTo(routes);
- VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
- interceptor.setVirtualDestinations(new VirtualDestination[]{route});
- brokerService.setDestinationInterceptors(new DestinationInterceptor[]{interceptor});
- brokerService.start();
-
- connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString());
- connectionFactory.setWatchTopicAdvisories(false);
- if (brokerService.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
-
- //with parallel sends and no consumers, concurrentStoreAnd dispatch, which uses a single thread by default
- // will stop/impeed write batching. The num threads will need tweaking when consumers are in the mix but may introduce
- // order issues
- ((KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatchQueues);
- }
- }
-}