You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/03/08 06:24:51 UTC
svn commit: r515927 - in /activemq/branches/activemq-4.1/activemq-core/src:
main/java/org/apache/activemq/broker/util/ test/java/org/apache/activemq/
test/java/org/apache/activemq/command/
Author: chirino
Date: Wed Mar 7 21:24:44 2007
New Revision: 515927
URL: http://svn.apache.org/viewvc?view=rev&rev=515927
Log:
Set the eol-style to native
Modified:
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java (props changed)
activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java (contents, props changed)
activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java (contents, props changed)
activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/TimeStampTest.java (props changed)
activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/command/MessageCompressionTest.java (props changed)
Propchange: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java?view=diff&rev=515927&r1=515926&r2=515927
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java (original)
+++ activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java Wed Mar 7 21:24:44 2007
@@ -1,439 +1,439 @@
-package org.apache.activemq;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Random;
-
-import javax.jms.BytesMessage;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-
-import junit.framework.Assert;
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.memory.UsageManager;
-import org.apache.activemq.network.DiscoveryNetworkConnector;
-import org.apache.activemq.network.NetworkConnector;
-import org.apache.activemq.pool.PooledConnectionFactory;
-import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.core.MessageCreator;
-import org.springframework.jms.listener.DefaultMessageListenerContainer;
-
-import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
-import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
-import edu.emory.mathcs.backport.java.util.concurrent.Executors;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
-
-public class AMQDeadlockTest3 extends TestCase {
-
- private static final String URL1 = "tcp://localhost:61616";
-
- private static final String URL2 = "tcp://localhost:61617";
-
- private static final String QUEUE1_NAME = "test.queue.1";
-
- private static final String QUEUE2_NAME = "test.queue.2";
-
- private static final int MAX_CONSUMERS = 1;
-
- private static final int MAX_PRODUCERS = 1;
-
- private static final int NUM_MESSAGE_TO_SEND = 10;
-
- private AtomicInteger messageCount = new AtomicInteger();
- private CountDownLatch doneLatch;
-
- public void setUp() throws Exception {
- }
-
- public void tearDown() throws Exception {
- }
-
- // This should fail with incubator-activemq-fuse-4.1.0.5
- public void testQueueLimitsWithOneBrokerSameConnection() throws Exception {
-
- BrokerService brokerService1 = null;
- ActiveMQConnectionFactory acf = null;
- PooledConnectionFactory pcf = null;
- DefaultMessageListenerContainer container1 = null;
-
- try {
- brokerService1 = createBrokerService("broker1", URL1, null);
- brokerService1.start();
-
- acf = createConnectionFactory(URL1);
- pcf = new PooledConnectionFactory(acf);
-
- // Only listen on the first queue.. let the 2nd queue fill up.
- doneLatch = new CountDownLatch(NUM_MESSAGE_TO_SEND);
- container1 = createDefaultMessageListenerContainer(acf, new TestMessageListener1(500), QUEUE1_NAME);
- container1.afterPropertiesSet();
-
- Thread.sleep(2000);
-
- final ExecutorService executor = Executors.newCachedThreadPool();
- for (int i = 0; i < MAX_PRODUCERS; i++) {
- executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME));
- Thread.sleep(1000);
- executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
- }
-
- // Wait for all message to arrive.
- assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
- executor.shutdownNow();
-
- Assert.assertEquals(NUM_MESSAGE_TO_SEND, messageCount.get());
-
- } finally {
-
- container1.stop();
- container1.destroy();
- container1 = null;
- brokerService1.stop();
- brokerService1 = null;
-
- }
-
- }
-
-
-
-
- // This should fail with incubator-activemq-fuse-4.1.0.5
- public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithOneConnectionForProducing()
- throws Exception {
-
- BrokerService brokerService1 = null;
- BrokerService brokerService2 = null;
- ActiveMQConnectionFactory acf1 = null;
- ActiveMQConnectionFactory acf2 = null;
- PooledConnectionFactory pcf = null;
- DefaultMessageListenerContainer container1 = null;
-
- try {
- brokerService1 = createBrokerService("broker1", URL1, URL2);
- brokerService1.start();
- brokerService2 = createBrokerService("broker2", URL2, URL1);
- brokerService2.start();
-
- acf1 = createConnectionFactory(URL1);
- acf2 = createConnectionFactory(URL2);
-
- pcf = new PooledConnectionFactory(acf1);
-
- Thread.sleep(1000);
-
- doneLatch = new CountDownLatch(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND);
- container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME);
- container1.afterPropertiesSet();
-
- final ExecutorService executor = Executors.newCachedThreadPool();
- for (int i = 0; i < MAX_PRODUCERS; i++) {
- executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME));
- Thread.sleep(1000);
- executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
- }
-
- assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
- executor.shutdownNow();
-
- Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND,
- messageCount.get());
- } finally {
-
- container1.stop();
- container1.destroy();
- container1 = null;
-
- brokerService1.stop();
- brokerService1 = null;
- brokerService2.stop();
- brokerService2 = null;
- }
- }
-
-
- // This should fail with incubator-activemq-fuse-4.1.0.5
- public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithSeperateConnectionsForProducing()
- throws Exception {
-
- BrokerService brokerService1 = null;
- BrokerService brokerService2 = null;
- ActiveMQConnectionFactory acf1 = null;
- ActiveMQConnectionFactory acf2 = null;
- DefaultMessageListenerContainer container1 = null;
- DefaultMessageListenerContainer container2 = null;
-
- try {
- brokerService1 = createBrokerService("broker1", URL1, URL2);
- brokerService1.start();
- brokerService2 = createBrokerService("broker2", URL2, URL1);
- brokerService2.start();
-
- acf1 = createConnectionFactory(URL1);
- acf2 = createConnectionFactory(URL2);
-
- Thread.sleep(1000);
-
- doneLatch = new CountDownLatch(NUM_MESSAGE_TO_SEND*MAX_PRODUCERS);
-
- container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME);
- container1.afterPropertiesSet();
- container2 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(30000), QUEUE2_NAME);
- container2.afterPropertiesSet();
-
- final ExecutorService executor = Executors.newCachedThreadPool();
- for (int i = 0; i < MAX_PRODUCERS; i++) {
- executor.submit(new NonPooledProducerTask(acf1, QUEUE2_NAME));
- Thread.sleep(1000);
- executor.submit(new NonPooledProducerTask(acf1, QUEUE1_NAME));
- }
-
- assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
- executor.shutdownNow();
-
- Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND, messageCount.get());
- } finally {
-
- container1.stop();
- container1.destroy();
- container1 = null;
-
- container2.stop();
- container2.destroy();
- container2 = null;
-
- brokerService1.stop();
- brokerService1 = null;
- brokerService2.stop();
- brokerService2 = null;
- }
- }
-
-
-
-
- private BrokerService createBrokerService(final String brokerName,
- final String uri1, final String uri2) throws Exception {
- final BrokerService brokerService = new BrokerService();
-
- brokerService.setBrokerName(brokerName);
- brokerService.setPersistent(false);
- brokerService.setUseJmx(true);
-
- final UsageManager memoryManager = new UsageManager();
- memoryManager.setLimit(5000000);
- brokerService.setMemoryManager(memoryManager);
-
- final ArrayList policyEntries = new ArrayList();
-
- final PolicyEntry entry = new PolicyEntry();
- entry.setQueue(">");
- // entry.setQueue(QUEUE1_NAME);
- entry.setMemoryLimit(1000);
- policyEntries.add(entry);
-
- final PolicyMap policyMap = new PolicyMap();
- policyMap.setPolicyEntries(policyEntries);
- brokerService.setDestinationPolicy(policyMap);
-
- final TransportConnector tConnector = new TransportConnector();
- tConnector.setUri(new URI(uri1));
- tConnector.setBrokerName(brokerName);
- tConnector.setName(brokerName + ".transportConnector");
- brokerService.addConnector(tConnector);
-
- if (uri2 != null) {
- final NetworkConnector nc = new DiscoveryNetworkConnector(new URI("static:" + uri2));
- nc.setBridgeTempDestinations(true);
- nc.setBrokerName(brokerName);
- nc.setName(brokerName + ".nc");
- brokerService.addNetworkConnector(nc);
- }
-
- return brokerService;
-
- }
-
- public DefaultMessageListenerContainer createDefaultMessageListenerContainer(
- final ConnectionFactory acf, final MessageListener listener,
- final String queue) {
- final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
- container.setConnectionFactory(acf);
- container.setDestinationName(queue);
- container.setMessageListener(listener);
- container.setSessionTransacted(false);
- container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
- container.setConcurrentConsumers(MAX_CONSUMERS);
- return container;
- }
-
- public ActiveMQConnectionFactory createConnectionFactory(final String url) {
- final ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(url);
- acf.setCopyMessageOnSend(false);
- acf.setUseAsyncSend(false);
- acf.setDispatchAsync(true);
- acf.setUseCompression(false);
- acf.setOptimizeAcknowledge(false);
- acf.setOptimizedMessageDispatch(true);
- acf.setUseSyncSend(true);
- return acf;
- }
-
- private class TestMessageListener1 implements MessageListener {
-
- private final long waitTime;
-
- public TestMessageListener1(long waitTime) {
- this.waitTime = waitTime;
-
- }
-
- public void onMessage(Message msg) {
-
- try {
- System.out.println("Listener1 Consumed message "+ msg.getIntProperty("count"));
-
- messageCount.incrementAndGet();
- doneLatch.countDown();
-
- Thread.sleep(waitTime);
- } catch (JMSException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- }
- }
-
-
- private class PooledProducerTask implements Runnable {
-
- private final String queueName;
-
- private final PooledConnectionFactory pcf;
-
- public PooledProducerTask(final PooledConnectionFactory pcf,
- final String queueName) {
- this.pcf = pcf;
- this.queueName = queueName;
- }
-
- public void run() {
-
- try {
-
- final JmsTemplate jmsTemplate = new JmsTemplate(pcf);
- jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- jmsTemplate.setExplicitQosEnabled(true);
- jmsTemplate.setMessageIdEnabled(false);
- jmsTemplate.setMessageTimestampEnabled(false);
- jmsTemplate.afterPropertiesSet();
-
- final byte[] bytes = new byte[2048];
- final Random r = new Random();
- r.nextBytes(bytes);
-
- Thread.sleep(2000);
-
- final AtomicInteger count = new AtomicInteger();
- for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
- jmsTemplate.send(queueName, new MessageCreator() {
-
- public Message createMessage(Session session)
- throws JMSException {
-
- final BytesMessage message = session.createBytesMessage();
-
- message.writeBytes(bytes);
- message.setIntProperty("count", count.incrementAndGet());
- message.setStringProperty("producer", "pooled");
- return message;
- }
- });
-
- System.out.println("PooledProducer sent message: "+ count.get());
- // Thread.sleep(1000);
- }
-
- } catch (final Throwable e) {
- System.err.println("Producer 1 is exiting.");
- e.printStackTrace();
- }
- }
- }
-
-
- private class NonPooledProducerTask implements Runnable {
-
- private final String queueName;
-
- private final ConnectionFactory cf;
-
- public NonPooledProducerTask(final ConnectionFactory cf,
- final String queueName) {
- this.cf = cf;
- this.queueName = queueName;
- }
-
- public void run() {
-
- try {
-
- final JmsTemplate jmsTemplate = new JmsTemplate(cf);
- jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- jmsTemplate.setExplicitQosEnabled(true);
- jmsTemplate.setMessageIdEnabled(false);
- jmsTemplate.setMessageTimestampEnabled(false);
- jmsTemplate.afterPropertiesSet();
-
- final byte[] bytes = new byte[2048];
- final Random r = new Random();
- r.nextBytes(bytes);
-
- Thread.sleep(2000);
-
- final AtomicInteger count = new AtomicInteger();
- for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
- jmsTemplate.send(queueName, new MessageCreator() {
-
- public Message createMessage(Session session)
- throws JMSException {
-
- final BytesMessage message = session
- .createBytesMessage();
-
- message.writeBytes(bytes);
- message.setIntProperty("count", count
- .incrementAndGet());
- message.setStringProperty("producer", "non-pooled");
- return message;
- }
- });
-
- System.out.println("Non-PooledProducer sent message: " + count.get());
-
- // Thread.sleep(1000);
- }
-
- } catch (final Throwable e) {
- System.err.println("Producer 1 is exiting.");
- e.printStackTrace();
- }
- }
- }
-
-}
+package org.apache.activemq;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Random;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
+import edu.emory.mathcs.backport.java.util.concurrent.Executors;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
+public class AMQDeadlockTest3 extends TestCase {
+
+ private static final String URL1 = "tcp://localhost:61616";
+
+ private static final String URL2 = "tcp://localhost:61617";
+
+ private static final String QUEUE1_NAME = "test.queue.1";
+
+ private static final String QUEUE2_NAME = "test.queue.2";
+
+ private static final int MAX_CONSUMERS = 1;
+
+ private static final int MAX_PRODUCERS = 1;
+
+ private static final int NUM_MESSAGE_TO_SEND = 10;
+
+ private AtomicInteger messageCount = new AtomicInteger();
+ private CountDownLatch doneLatch;
+
+ public void setUp() throws Exception {
+ }
+
+ public void tearDown() throws Exception {
+ }
+
+ // This should fail with incubator-activemq-fuse-4.1.0.5
+ public void testQueueLimitsWithOneBrokerSameConnection() throws Exception {
+
+ BrokerService brokerService1 = null;
+ ActiveMQConnectionFactory acf = null;
+ PooledConnectionFactory pcf = null;
+ DefaultMessageListenerContainer container1 = null;
+
+ try {
+ brokerService1 = createBrokerService("broker1", URL1, null);
+ brokerService1.start();
+
+ acf = createConnectionFactory(URL1);
+ pcf = new PooledConnectionFactory(acf);
+
+ // Only listen on the first queue.. let the 2nd queue fill up.
+ doneLatch = new CountDownLatch(NUM_MESSAGE_TO_SEND);
+ container1 = createDefaultMessageListenerContainer(acf, new TestMessageListener1(500), QUEUE1_NAME);
+ container1.afterPropertiesSet();
+
+ Thread.sleep(2000);
+
+ final ExecutorService executor = Executors.newCachedThreadPool();
+ for (int i = 0; i < MAX_PRODUCERS; i++) {
+ executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME));
+ Thread.sleep(1000);
+ executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
+ }
+
+ // Wait for all message to arrive.
+ assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
+ executor.shutdownNow();
+
+ Assert.assertEquals(NUM_MESSAGE_TO_SEND, messageCount.get());
+
+ } finally {
+
+ container1.stop();
+ container1.destroy();
+ container1 = null;
+ brokerService1.stop();
+ brokerService1 = null;
+
+ }
+
+ }
+
+
+
+
+ // This should fail with incubator-activemq-fuse-4.1.0.5
+ public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithOneConnectionForProducing()
+ throws Exception {
+
+ BrokerService brokerService1 = null;
+ BrokerService brokerService2 = null;
+ ActiveMQConnectionFactory acf1 = null;
+ ActiveMQConnectionFactory acf2 = null;
+ PooledConnectionFactory pcf = null;
+ DefaultMessageListenerContainer container1 = null;
+
+ try {
+ brokerService1 = createBrokerService("broker1", URL1, URL2);
+ brokerService1.start();
+ brokerService2 = createBrokerService("broker2", URL2, URL1);
+ brokerService2.start();
+
+ acf1 = createConnectionFactory(URL1);
+ acf2 = createConnectionFactory(URL2);
+
+ pcf = new PooledConnectionFactory(acf1);
+
+ Thread.sleep(1000);
+
+ doneLatch = new CountDownLatch(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND);
+ container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME);
+ container1.afterPropertiesSet();
+
+ final ExecutorService executor = Executors.newCachedThreadPool();
+ for (int i = 0; i < MAX_PRODUCERS; i++) {
+ executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME));
+ Thread.sleep(1000);
+ executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
+ }
+
+ assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
+ executor.shutdownNow();
+
+ Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND,
+ messageCount.get());
+ } finally {
+
+ container1.stop();
+ container1.destroy();
+ container1 = null;
+
+ brokerService1.stop();
+ brokerService1 = null;
+ brokerService2.stop();
+ brokerService2 = null;
+ }
+ }
+
+
+ // This should fail with incubator-activemq-fuse-4.1.0.5
+ public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithSeperateConnectionsForProducing()
+ throws Exception {
+
+ BrokerService brokerService1 = null;
+ BrokerService brokerService2 = null;
+ ActiveMQConnectionFactory acf1 = null;
+ ActiveMQConnectionFactory acf2 = null;
+ DefaultMessageListenerContainer container1 = null;
+ DefaultMessageListenerContainer container2 = null;
+
+ try {
+ brokerService1 = createBrokerService("broker1", URL1, URL2);
+ brokerService1.start();
+ brokerService2 = createBrokerService("broker2", URL2, URL1);
+ brokerService2.start();
+
+ acf1 = createConnectionFactory(URL1);
+ acf2 = createConnectionFactory(URL2);
+
+ Thread.sleep(1000);
+
+ doneLatch = new CountDownLatch(NUM_MESSAGE_TO_SEND*MAX_PRODUCERS);
+
+ container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME);
+ container1.afterPropertiesSet();
+ container2 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(30000), QUEUE2_NAME);
+ container2.afterPropertiesSet();
+
+ final ExecutorService executor = Executors.newCachedThreadPool();
+ for (int i = 0; i < MAX_PRODUCERS; i++) {
+ executor.submit(new NonPooledProducerTask(acf1, QUEUE2_NAME));
+ Thread.sleep(1000);
+ executor.submit(new NonPooledProducerTask(acf1, QUEUE1_NAME));
+ }
+
+ assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
+ executor.shutdownNow();
+
+ Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND, messageCount.get());
+ } finally {
+
+ container1.stop();
+ container1.destroy();
+ container1 = null;
+
+ container2.stop();
+ container2.destroy();
+ container2 = null;
+
+ brokerService1.stop();
+ brokerService1 = null;
+ brokerService2.stop();
+ brokerService2 = null;
+ }
+ }
+
+
+
+
+ private BrokerService createBrokerService(final String brokerName,
+ final String uri1, final String uri2) throws Exception {
+ final BrokerService brokerService = new BrokerService();
+
+ brokerService.setBrokerName(brokerName);
+ brokerService.setPersistent(false);
+ brokerService.setUseJmx(true);
+
+ final UsageManager memoryManager = new UsageManager();
+ memoryManager.setLimit(5000000);
+ brokerService.setMemoryManager(memoryManager);
+
+ final ArrayList policyEntries = new ArrayList();
+
+ final PolicyEntry entry = new PolicyEntry();
+ entry.setQueue(">");
+ // entry.setQueue(QUEUE1_NAME);
+ entry.setMemoryLimit(1000);
+ policyEntries.add(entry);
+
+ final PolicyMap policyMap = new PolicyMap();
+ policyMap.setPolicyEntries(policyEntries);
+ brokerService.setDestinationPolicy(policyMap);
+
+ final TransportConnector tConnector = new TransportConnector();
+ tConnector.setUri(new URI(uri1));
+ tConnector.setBrokerName(brokerName);
+ tConnector.setName(brokerName + ".transportConnector");
+ brokerService.addConnector(tConnector);
+
+ if (uri2 != null) {
+ final NetworkConnector nc = new DiscoveryNetworkConnector(new URI("static:" + uri2));
+ nc.setBridgeTempDestinations(true);
+ nc.setBrokerName(brokerName);
+ nc.setName(brokerName + ".nc");
+ brokerService.addNetworkConnector(nc);
+ }
+
+ return brokerService;
+
+ }
+
+ public DefaultMessageListenerContainer createDefaultMessageListenerContainer(
+ final ConnectionFactory acf, final MessageListener listener,
+ final String queue) {
+ final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
+ container.setConnectionFactory(acf);
+ container.setDestinationName(queue);
+ container.setMessageListener(listener);
+ container.setSessionTransacted(false);
+ container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
+ container.setConcurrentConsumers(MAX_CONSUMERS);
+ return container;
+ }
+
+ public ActiveMQConnectionFactory createConnectionFactory(final String url) {
+ final ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(url);
+ acf.setCopyMessageOnSend(false);
+ acf.setUseAsyncSend(false);
+ acf.setDispatchAsync(true);
+ acf.setUseCompression(false);
+ acf.setOptimizeAcknowledge(false);
+ acf.setOptimizedMessageDispatch(true);
+ acf.setUseSyncSend(true);
+ return acf;
+ }
+
+ private class TestMessageListener1 implements MessageListener {
+
+ private final long waitTime;
+
+ public TestMessageListener1(long waitTime) {
+ this.waitTime = waitTime;
+
+ }
+
+ public void onMessage(Message msg) {
+
+ try {
+ System.out.println("Listener1 Consumed message "+ msg.getIntProperty("count"));
+
+ messageCount.incrementAndGet();
+ doneLatch.countDown();
+
+ Thread.sleep(waitTime);
+ } catch (JMSException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+ }
+
+
+ private class PooledProducerTask implements Runnable {
+
+ private final String queueName;
+
+ private final PooledConnectionFactory pcf;
+
+ public PooledProducerTask(final PooledConnectionFactory pcf,
+ final String queueName) {
+ this.pcf = pcf;
+ this.queueName = queueName;
+ }
+
+ public void run() {
+
+ try {
+
+ final JmsTemplate jmsTemplate = new JmsTemplate(pcf);
+ jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ jmsTemplate.setExplicitQosEnabled(true);
+ jmsTemplate.setMessageIdEnabled(false);
+ jmsTemplate.setMessageTimestampEnabled(false);
+ jmsTemplate.afterPropertiesSet();
+
+ final byte[] bytes = new byte[2048];
+ final Random r = new Random();
+ r.nextBytes(bytes);
+
+ Thread.sleep(2000);
+
+ final AtomicInteger count = new AtomicInteger();
+ for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
+ jmsTemplate.send(queueName, new MessageCreator() {
+
+ public Message createMessage(Session session)
+ throws JMSException {
+
+ final BytesMessage message = session.createBytesMessage();
+
+ message.writeBytes(bytes);
+ message.setIntProperty("count", count.incrementAndGet());
+ message.setStringProperty("producer", "pooled");
+ return message;
+ }
+ });
+
+ System.out.println("PooledProducer sent message: "+ count.get());
+ // Thread.sleep(1000);
+ }
+
+ } catch (final Throwable e) {
+ System.err.println("Producer 1 is exiting.");
+ e.printStackTrace();
+ }
+ }
+ }
+
+
+ private class NonPooledProducerTask implements Runnable {
+
+ private final String queueName;
+
+ private final ConnectionFactory cf;
+
+ public NonPooledProducerTask(final ConnectionFactory cf,
+ final String queueName) {
+ this.cf = cf;
+ this.queueName = queueName;
+ }
+
+ public void run() {
+
+ try {
+
+ final JmsTemplate jmsTemplate = new JmsTemplate(cf);
+ jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ jmsTemplate.setExplicitQosEnabled(true);
+ jmsTemplate.setMessageIdEnabled(false);
+ jmsTemplate.setMessageTimestampEnabled(false);
+ jmsTemplate.afterPropertiesSet();
+
+ final byte[] bytes = new byte[2048];
+ final Random r = new Random();
+ r.nextBytes(bytes);
+
+ Thread.sleep(2000);
+
+ final AtomicInteger count = new AtomicInteger();
+ for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
+ jmsTemplate.send(queueName, new MessageCreator() {
+
+ public Message createMessage(Session session)
+ throws JMSException {
+
+ final BytesMessage message = session
+ .createBytesMessage();
+
+ message.writeBytes(bytes);
+ message.setIntProperty("count", count
+ .incrementAndGet());
+ message.setStringProperty("producer", "non-pooled");
+ return message;
+ }
+ });
+
+ System.out.println("Non-PooledProducer sent message: " + count.get());
+
+ // Thread.sleep(1000);
+ }
+
+ } catch (final Throwable e) {
+ System.err.println("Producer 1 is exiting.");
+ e.printStackTrace();
+ }
+ }
+ }
+
+}
Propchange: activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java?view=diff&rev=515927&r1=515926&r2=515927
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java (original)
+++ activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java Wed Mar 7 21:24:44 2007
@@ -1,161 +1,161 @@
-package org.apache.activemq;
-
-import java.io.IOException;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.transport.tcp.TcpTransport;
-
-import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
-
-public class ProducerFlowControlTest extends JmsTestSupport {
-
- ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
- ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B");
- private TransportConnector connector;
- private ActiveMQConnection connection;
-
- public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws Exception {
- ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory();
- factory.setUseSyncSend(true);
- connection = (ActiveMQConnection) factory.createConnection();
- connections.add(connection);
- connection.start();
-
- // Test sending to Queue A
- // 1st send should not block.
- fillQueue(queueA);
-
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(queueB);
-
- // Test sending to Queue B it should block.
- // Since even though the it's queue limits have not been reached, the connection
- // is blocked.
- CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
- assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
-
- TextMessage msg = (TextMessage) consumer.receive();
- assertEquals("Message 1", msg.getText());
- msg.acknowledge();
-
- pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2");
- assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
-
- msg = (TextMessage) consumer.receive();
- assertEquals("Message 2", msg.getText());
- msg.acknowledge();
- }
-
- public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws Exception {
- ConnectionFactory factory = createConnectionFactory();
- connection = (ActiveMQConnection) factory.createConnection();
- connections.add(connection);
- connection.start();
-
- // Test sending to Queue A
- // 1st send should not block.
- fillQueue(queueA);
-
- // Test sending to Queue B it should block.
- // Since even though the it's queue limits have not been reached, the connection
- // is blocked.
- CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
- assertFalse( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
- }
-
-
- private void fillQueue(final ActiveMQQueue queue) throws JMSException, InterruptedException {
- final AtomicBoolean done = new AtomicBoolean(true);
- final AtomicBoolean keepGoing = new AtomicBoolean(true);
-
- // Starts an async thread that every time it publishes it sets the done flag to false.
- // Once the send starts to block it will not reset the done flag anymore.
- new Thread("Fill thread.") {
- public void run() {
- Session session=null;
- try {
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(queue);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- while( keepGoing.get() ) {
- done.set(false);
- producer.send(session.createTextMessage("Hello World"));
- }
- } catch (JMSException e) {
- } finally {
- safeClose(session);
- }
- }
- }.start();
-
- while( true ) {
- Thread.sleep(1000);
- // the producer is blocked once the done flag stays true.
- if( done.get() )
- break;
- done.set(true);
- }
- keepGoing.set(false);
- }
-
- private CountDownLatch asyncSendTo(final ActiveMQQueue queue, final String message) throws JMSException {
- final CountDownLatch done = new CountDownLatch(1);
- new Thread("Send thread.") {
- public void run() {
- Session session=null;
- try {
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(queue);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- producer.send(session.createTextMessage(message));
- done.countDown();
- } catch (JMSException e) {
- } finally {
- safeClose(session);
- }
- }
- }.start();
- return done;
- }
-
- protected BrokerService createBroker() throws Exception {
- BrokerService service = new BrokerService();
- service.setPersistent(false);
- service.setUseJmx(true);
-
- // Setup a destination policy where it takes only 1 message at a time.
- PolicyMap policyMap = new PolicyMap();
- PolicyEntry policy = new PolicyEntry();
- policy.setMemoryLimit(1);
- policyMap.setDefaultEntry(policy);
- service.setDestinationPolicy(policyMap);
-
- connector = service.addConnector("tcp://localhost:0");
- return service;
- }
-
- protected void tearDown() throws Exception {
- TcpTransport t = (TcpTransport) connection.getTransport().narrow(TcpTransport.class);
- t.getTransportListener().onException(new IOException("Disposed."));
- connection.getTransport().stop();
- super.tearDown();
- }
-
- protected ConnectionFactory createConnectionFactory() throws Exception {
- return new ActiveMQConnectionFactory(connector.getConnectUri());
- }
-}
+package org.apache.activemq;
+
+import java.io.IOException;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.transport.tcp.TcpTransport;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+public class ProducerFlowControlTest extends JmsTestSupport {
+
+ ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
+ ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B");
+ private TransportConnector connector;
+ private ActiveMQConnection connection;
+
+ public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws Exception {
+ ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory();
+ factory.setUseSyncSend(true);
+ connection = (ActiveMQConnection) factory.createConnection();
+ connections.add(connection);
+ connection.start();
+
+ // Test sending to Queue A
+ // 1st send should not block.
+ fillQueue(queueA);
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queueB);
+
+ // Test sending to Queue B it should block.
+ // Since even though the it's queue limits have not been reached, the connection
+ // is blocked.
+ CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
+ assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
+
+ TextMessage msg = (TextMessage) consumer.receive();
+ assertEquals("Message 1", msg.getText());
+ msg.acknowledge();
+
+ pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2");
+ assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
+
+ msg = (TextMessage) consumer.receive();
+ assertEquals("Message 2", msg.getText());
+ msg.acknowledge();
+ }
+
+ public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws Exception {
+ ConnectionFactory factory = createConnectionFactory();
+ connection = (ActiveMQConnection) factory.createConnection();
+ connections.add(connection);
+ connection.start();
+
+ // Test sending to Queue A
+ // 1st send should not block.
+ fillQueue(queueA);
+
+ // Test sending to Queue B it should block.
+ // Since even though the it's queue limits have not been reached, the connection
+ // is blocked.
+ CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
+ assertFalse( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
+ }
+
+
+ private void fillQueue(final ActiveMQQueue queue) throws JMSException, InterruptedException {
+ final AtomicBoolean done = new AtomicBoolean(true);
+ final AtomicBoolean keepGoing = new AtomicBoolean(true);
+
+ // Starts an async thread that every time it publishes it sets the done flag to false.
+ // Once the send starts to block it will not reset the done flag anymore.
+ new Thread("Fill thread.") {
+ public void run() {
+ Session session=null;
+ try {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ while( keepGoing.get() ) {
+ done.set(false);
+ producer.send(session.createTextMessage("Hello World"));
+ }
+ } catch (JMSException e) {
+ } finally {
+ safeClose(session);
+ }
+ }
+ }.start();
+
+ while( true ) {
+ Thread.sleep(1000);
+ // the producer is blocked once the done flag stays true.
+ if( done.get() )
+ break;
+ done.set(true);
+ }
+ keepGoing.set(false);
+ }
+
+ private CountDownLatch asyncSendTo(final ActiveMQQueue queue, final String message) throws JMSException {
+ final CountDownLatch done = new CountDownLatch(1);
+ new Thread("Send thread.") {
+ public void run() {
+ Session session=null;
+ try {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ producer.send(session.createTextMessage(message));
+ done.countDown();
+ } catch (JMSException e) {
+ } finally {
+ safeClose(session);
+ }
+ }
+ }.start();
+ return done;
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService service = new BrokerService();
+ service.setPersistent(false);
+ service.setUseJmx(true);
+
+ // Setup a destination policy where it takes only 1 message at a time.
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry policy = new PolicyEntry();
+ policy.setMemoryLimit(1);
+ policyMap.setDefaultEntry(policy);
+ service.setDestinationPolicy(policyMap);
+
+ connector = service.addConnector("tcp://localhost:0");
+ return service;
+ }
+
+ protected void tearDown() throws Exception {
+ TcpTransport t = (TcpTransport) connection.getTransport().narrow(TcpTransport.class);
+ t.getTransportListener().onException(new IOException("Disposed."));
+ connection.getTransport().stop();
+ super.tearDown();
+ }
+
+ protected ConnectionFactory createConnectionFactory() throws Exception {
+ return new ActiveMQConnectionFactory(connector.getConnectUri());
+ }
+}
Propchange: activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/TimeStampTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
------------------------------------------------------------------------------
svn:eol-style = native