You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/03/08 06:24:51 UTC

svn commit: r515927 - in /activemq/branches/activemq-4.1/activemq-core/src: main/java/org/apache/activemq/broker/util/ test/java/org/apache/activemq/ test/java/org/apache/activemq/command/

Author: chirino
Date: Wed Mar  7 21:24:44 2007
New Revision: 515927

URL: http://svn.apache.org/viewvc?view=rev&rev=515927
Log:
Set the eol-style to native

Modified:
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java   (props changed)
    activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java   (contents, props changed)
    activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java   (contents, props changed)
    activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/TimeStampTest.java   (props changed)
    activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/command/MessageCompressionTest.java   (props changed)

Propchange: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java?view=diff&rev=515927&r1=515926&r2=515927
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java (original)
+++ activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java Wed Mar  7 21:24:44 2007
@@ -1,439 +1,439 @@
-package org.apache.activemq;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Random;
-
-import javax.jms.BytesMessage;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-
-import junit.framework.Assert;
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.memory.UsageManager;
-import org.apache.activemq.network.DiscoveryNetworkConnector;
-import org.apache.activemq.network.NetworkConnector;
-import org.apache.activemq.pool.PooledConnectionFactory;
-import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.core.MessageCreator;
-import org.springframework.jms.listener.DefaultMessageListenerContainer;
-
-import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
-import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
-import edu.emory.mathcs.backport.java.util.concurrent.Executors;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
-
-public class AMQDeadlockTest3 extends TestCase {
-
-	private static final String URL1 = "tcp://localhost:61616";
-
-	private static final String URL2 = "tcp://localhost:61617";
-
-	private static final String QUEUE1_NAME = "test.queue.1";
-
-	private static final String QUEUE2_NAME = "test.queue.2";
-
-	private static final int MAX_CONSUMERS = 1;
-
-	private static final int MAX_PRODUCERS = 1;
-
-	private static final int NUM_MESSAGE_TO_SEND = 10;
-
-	private AtomicInteger messageCount = new AtomicInteger();
-	private CountDownLatch doneLatch;
-
-	public void setUp() throws Exception {
-	}
-
-	public void tearDown() throws Exception {
-	}
-
-	// This should fail with incubator-activemq-fuse-4.1.0.5
-	public void testQueueLimitsWithOneBrokerSameConnection() throws Exception {
-
-		BrokerService brokerService1 = null;
-		ActiveMQConnectionFactory acf = null;
-		PooledConnectionFactory pcf = null;
-		DefaultMessageListenerContainer container1 = null;
-
-		try {
-			brokerService1 = createBrokerService("broker1", URL1, null);
-			brokerService1.start();
-
-			acf = createConnectionFactory(URL1);
-			pcf = new PooledConnectionFactory(acf);
-
-			// Only listen on the first queue.. let the 2nd queue fill up.
-			doneLatch = new CountDownLatch(NUM_MESSAGE_TO_SEND);
-			container1 = createDefaultMessageListenerContainer(acf,	new TestMessageListener1(500), QUEUE1_NAME);
-			container1.afterPropertiesSet();
-
-			Thread.sleep(2000);
-
-			final ExecutorService executor = Executors.newCachedThreadPool();
-			for (int i = 0; i < MAX_PRODUCERS; i++) {
-				executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME));
-				Thread.sleep(1000);
-				executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
-			}
-			
-			// Wait for all message to arrive.
-			assertTrue(doneLatch.await(20, TimeUnit.SECONDS));			
-			executor.shutdownNow();
-
-			Assert.assertEquals(NUM_MESSAGE_TO_SEND, messageCount.get());
-
-		} finally {
-
-			container1.stop();
-			container1.destroy();
-			container1 = null;
-			brokerService1.stop();
-			brokerService1 = null;
-
-		}
-
-	}
-	
-
-
-	
-	// This should fail with incubator-activemq-fuse-4.1.0.5
-	public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithOneConnectionForProducing()
-			throws Exception {
-
-		BrokerService brokerService1 = null;
-		BrokerService brokerService2 = null;
-		ActiveMQConnectionFactory acf1 = null;
-		ActiveMQConnectionFactory acf2 = null;
-		PooledConnectionFactory pcf = null;
-		DefaultMessageListenerContainer container1 = null;
-
-		try {
-			brokerService1 = createBrokerService("broker1", URL1, URL2);
-			brokerService1.start();
-			brokerService2 = createBrokerService("broker2", URL2, URL1);
-			brokerService2.start();
-
-			acf1 = createConnectionFactory(URL1);
-			acf2 = createConnectionFactory(URL2);
-
-			pcf = new PooledConnectionFactory(acf1);
-
-			Thread.sleep(1000);
-
-			doneLatch = new CountDownLatch(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND);
-			container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME);
-			container1.afterPropertiesSet();
-
-			final ExecutorService executor = Executors.newCachedThreadPool();
-			for (int i = 0; i < MAX_PRODUCERS; i++) {
-				executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME));
-				Thread.sleep(1000);
-				executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
-			}
-
-			assertTrue(doneLatch.await(20, TimeUnit.SECONDS));			
-			executor.shutdownNow();
-
-			Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND,
-					messageCount.get());
-		} finally {
-
-			container1.stop();
-			container1.destroy();
-			container1 = null;
-
-			brokerService1.stop();
-			brokerService1 = null;
-			brokerService2.stop();
-			brokerService2 = null;
-		}
-	}
-	
-	
-	// This should fail with incubator-activemq-fuse-4.1.0.5
-	public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithSeperateConnectionsForProducing()
-			throws Exception {
-
-		BrokerService brokerService1 = null;
-		BrokerService brokerService2 = null;
-		ActiveMQConnectionFactory acf1 = null;
-		ActiveMQConnectionFactory acf2 = null;
-		DefaultMessageListenerContainer container1 = null;
-		DefaultMessageListenerContainer container2 = null;
-		
-		try {
-			brokerService1 = createBrokerService("broker1", URL1, URL2);
-			brokerService1.start();
-			brokerService2 = createBrokerService("broker2", URL2, URL1);
-			brokerService2.start();
-
-			acf1 = createConnectionFactory(URL1);
-			acf2 = createConnectionFactory(URL2);
-
-			Thread.sleep(1000);
-
-			doneLatch = new CountDownLatch(NUM_MESSAGE_TO_SEND*MAX_PRODUCERS);
-
-			container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME);
-			container1.afterPropertiesSet();
-			container2 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(30000), QUEUE2_NAME);
-			container2.afterPropertiesSet();
-
-			final ExecutorService executor = Executors.newCachedThreadPool();
-			for (int i = 0; i < MAX_PRODUCERS; i++) {
-				executor.submit(new NonPooledProducerTask(acf1, QUEUE2_NAME));
-				Thread.sleep(1000);
-				executor.submit(new NonPooledProducerTask(acf1, QUEUE1_NAME));
-			}
-
-			assertTrue(doneLatch.await(20, TimeUnit.SECONDS));			
-			executor.shutdownNow();
-
-			Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND, messageCount.get());
-		} finally {
-
-			container1.stop();
-			container1.destroy();
-			container1 = null;
-			
-			container2.stop();
-			container2.destroy();
-			container2 = null;
-
-			brokerService1.stop();
-			brokerService1 = null;
-			brokerService2.stop();
-			brokerService2 = null;
-		}
-	}
-
-
-
-
-	private BrokerService createBrokerService(final String brokerName,
-			final String uri1, final String uri2) throws Exception {
-		final BrokerService brokerService = new BrokerService();
-
-		brokerService.setBrokerName(brokerName);
-		brokerService.setPersistent(false);
-		brokerService.setUseJmx(true);
-
-		final UsageManager memoryManager = new UsageManager();
-		memoryManager.setLimit(5000000);
-		brokerService.setMemoryManager(memoryManager);
-
-		final ArrayList policyEntries = new ArrayList();
-
-		final PolicyEntry entry = new PolicyEntry();
-		entry.setQueue(">");
-		// entry.setQueue(QUEUE1_NAME);
-		entry.setMemoryLimit(1000);
-		policyEntries.add(entry);
-
-		final PolicyMap policyMap = new PolicyMap();
-		policyMap.setPolicyEntries(policyEntries);
-		brokerService.setDestinationPolicy(policyMap);
-
-		final TransportConnector tConnector = new TransportConnector();
-		tConnector.setUri(new URI(uri1));
-		tConnector.setBrokerName(brokerName);
-		tConnector.setName(brokerName + ".transportConnector");
-		brokerService.addConnector(tConnector);
-
-		if (uri2 != null) {
-			final NetworkConnector nc = new DiscoveryNetworkConnector(new URI("static:" + uri2));
-			nc.setBridgeTempDestinations(true);
-			nc.setBrokerName(brokerName);
-			nc.setName(brokerName + ".nc");
-			brokerService.addNetworkConnector(nc);
-		}
-
-		return brokerService;
-
-	}
-
-	public DefaultMessageListenerContainer createDefaultMessageListenerContainer(
-			final ConnectionFactory acf, final MessageListener listener,
-			final String queue) {
-		final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
-		container.setConnectionFactory(acf);
-		container.setDestinationName(queue);
-		container.setMessageListener(listener);
-		container.setSessionTransacted(false);
-		container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
-		container.setConcurrentConsumers(MAX_CONSUMERS);
-		return container;
-	}
-
-	public ActiveMQConnectionFactory createConnectionFactory(final String url) {
-		final ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(url);
-		acf.setCopyMessageOnSend(false);
-		acf.setUseAsyncSend(false);
-		acf.setDispatchAsync(true);
-		acf.setUseCompression(false);
-		acf.setOptimizeAcknowledge(false);
-		acf.setOptimizedMessageDispatch(true);
-		acf.setUseSyncSend(true);
-		return acf;
-	}
-
-	private class TestMessageListener1 implements MessageListener {
-
-		private final long waitTime;
-
-		public TestMessageListener1(long waitTime) {
-			this.waitTime = waitTime;
-		
-		}
-
-		public void onMessage(Message msg) {
-
-			try {
-				System.out.println("Listener1 Consumed message "+ msg.getIntProperty("count"));
-
-				messageCount.incrementAndGet();
-				doneLatch.countDown();
-				
-				Thread.sleep(waitTime);
-			} catch (JMSException e) {
-				// TODO Auto-generated catch block
-				e.printStackTrace();
-			} catch (InterruptedException e) {
-				// TODO Auto-generated catch block
-				e.printStackTrace();
-			}
-
-		}
-	}
-
-
-	private class PooledProducerTask implements Runnable {
-
-		private final String queueName;
-
-		private final PooledConnectionFactory pcf;
-
-		public PooledProducerTask(final PooledConnectionFactory pcf,
-				final String queueName) {
-			this.pcf = pcf;
-			this.queueName = queueName;
-		}
-
-		public void run() {
-
-			try {
-
-				final JmsTemplate jmsTemplate = new JmsTemplate(pcf);
-				jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-				jmsTemplate.setExplicitQosEnabled(true);
-				jmsTemplate.setMessageIdEnabled(false);
-				jmsTemplate.setMessageTimestampEnabled(false);
-				jmsTemplate.afterPropertiesSet();
-
-				final byte[] bytes = new byte[2048];
-				final Random r = new Random();
-				r.nextBytes(bytes);
-
-				Thread.sleep(2000);
-
-				final AtomicInteger count = new AtomicInteger();
-				for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
-					jmsTemplate.send(queueName, new MessageCreator() {
-
-						public Message createMessage(Session session)
-								throws JMSException {
-
-							final BytesMessage message = session.createBytesMessage();
-
-							message.writeBytes(bytes);
-							message.setIntProperty("count", count.incrementAndGet());
-							message.setStringProperty("producer", "pooled");
-							return message;
-						}
-					});
-
-					System.out.println("PooledProducer sent message: "+ count.get());
-					// Thread.sleep(1000);
-				}
-
-			} catch (final Throwable e) {
-				System.err.println("Producer 1 is exiting.");
-				e.printStackTrace();
-			}
-		}
-	}
-	
-	
-	private class NonPooledProducerTask implements Runnable {
-
-		private final String queueName;
-
-		private final ConnectionFactory cf;
-
-		public NonPooledProducerTask(final ConnectionFactory cf,
-				final String queueName) {
-			this.cf = cf;
-			this.queueName = queueName;
-		}
-
-		public void run() {
-
-			try {
-
-				final JmsTemplate jmsTemplate = new JmsTemplate(cf);
-				jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-				jmsTemplate.setExplicitQosEnabled(true);
-				jmsTemplate.setMessageIdEnabled(false);
-				jmsTemplate.setMessageTimestampEnabled(false);
-				jmsTemplate.afterPropertiesSet();
-
-				final byte[] bytes = new byte[2048];
-				final Random r = new Random();
-				r.nextBytes(bytes);
-
-				Thread.sleep(2000);
-
-				final AtomicInteger count = new AtomicInteger();
-				for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
-					jmsTemplate.send(queueName, new MessageCreator() {
-
-						public Message createMessage(Session session)
-								throws JMSException {
-
-							final BytesMessage message = session
-									.createBytesMessage();
-
-							message.writeBytes(bytes);
-							message.setIntProperty("count", count
-									.incrementAndGet());
-							message.setStringProperty("producer", "non-pooled");
-							return message;
-						}
-					});
-
-					System.out.println("Non-PooledProducer sent message: " + count.get());
-
-					// Thread.sleep(1000);
-				}
-
-			} catch (final Throwable e) {
-				System.err.println("Producer 1 is exiting.");
-				e.printStackTrace();
-			}
-		}
-	}
-
-}
+package org.apache.activemq;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Random;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
+import edu.emory.mathcs.backport.java.util.concurrent.Executors;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
+public class AMQDeadlockTest3 extends TestCase {
+
+	private static final String URL1 = "tcp://localhost:61616";
+
+	private static final String URL2 = "tcp://localhost:61617";
+
+	private static final String QUEUE1_NAME = "test.queue.1";
+
+	private static final String QUEUE2_NAME = "test.queue.2";
+
+	private static final int MAX_CONSUMERS = 1;
+
+	private static final int MAX_PRODUCERS = 1;
+
+	private static final int NUM_MESSAGE_TO_SEND = 10;
+
+	private AtomicInteger messageCount = new AtomicInteger();
+	private CountDownLatch doneLatch;
+
+	public void setUp() throws Exception {
+	}
+
+	public void tearDown() throws Exception {
+	}
+
+	// This should fail with incubator-activemq-fuse-4.1.0.5
+	public void testQueueLimitsWithOneBrokerSameConnection() throws Exception {
+
+		BrokerService brokerService1 = null;
+		ActiveMQConnectionFactory acf = null;
+		PooledConnectionFactory pcf = null;
+		DefaultMessageListenerContainer container1 = null;
+
+		try {
+			brokerService1 = createBrokerService("broker1", URL1, null);
+			brokerService1.start();
+
+			acf = createConnectionFactory(URL1);
+			pcf = new PooledConnectionFactory(acf);
+
+			// Only listen on the first queue.. let the 2nd queue fill up.
+			doneLatch = new CountDownLatch(NUM_MESSAGE_TO_SEND);
+			container1 = createDefaultMessageListenerContainer(acf,	new TestMessageListener1(500), QUEUE1_NAME);
+			container1.afterPropertiesSet();
+
+			Thread.sleep(2000);
+
+			final ExecutorService executor = Executors.newCachedThreadPool();
+			for (int i = 0; i < MAX_PRODUCERS; i++) {
+				executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME));
+				Thread.sleep(1000);
+				executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
+			}
+			
+			// Wait for all message to arrive.
+			assertTrue(doneLatch.await(20, TimeUnit.SECONDS));			
+			executor.shutdownNow();
+
+			Assert.assertEquals(NUM_MESSAGE_TO_SEND, messageCount.get());
+
+		} finally {
+
+			container1.stop();
+			container1.destroy();
+			container1 = null;
+			brokerService1.stop();
+			brokerService1 = null;
+
+		}
+
+	}
+	
+
+
+	
+	// This should fail with incubator-activemq-fuse-4.1.0.5
+	public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithOneConnectionForProducing()
+			throws Exception {
+
+		BrokerService brokerService1 = null;
+		BrokerService brokerService2 = null;
+		ActiveMQConnectionFactory acf1 = null;
+		ActiveMQConnectionFactory acf2 = null;
+		PooledConnectionFactory pcf = null;
+		DefaultMessageListenerContainer container1 = null;
+
+		try {
+			brokerService1 = createBrokerService("broker1", URL1, URL2);
+			brokerService1.start();
+			brokerService2 = createBrokerService("broker2", URL2, URL1);
+			brokerService2.start();
+
+			acf1 = createConnectionFactory(URL1);
+			acf2 = createConnectionFactory(URL2);
+
+			pcf = new PooledConnectionFactory(acf1);
+
+			Thread.sleep(1000);
+
+			doneLatch = new CountDownLatch(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND);
+			container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME);
+			container1.afterPropertiesSet();
+
+			final ExecutorService executor = Executors.newCachedThreadPool();
+			for (int i = 0; i < MAX_PRODUCERS; i++) {
+				executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME));
+				Thread.sleep(1000);
+				executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
+			}
+
+			assertTrue(doneLatch.await(20, TimeUnit.SECONDS));			
+			executor.shutdownNow();
+
+			Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND,
+					messageCount.get());
+		} finally {
+
+			container1.stop();
+			container1.destroy();
+			container1 = null;
+
+			brokerService1.stop();
+			brokerService1 = null;
+			brokerService2.stop();
+			brokerService2 = null;
+		}
+	}
+	
+	
+	// This should fail with incubator-activemq-fuse-4.1.0.5
+	public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithSeperateConnectionsForProducing()
+			throws Exception {
+
+		BrokerService brokerService1 = null;
+		BrokerService brokerService2 = null;
+		ActiveMQConnectionFactory acf1 = null;
+		ActiveMQConnectionFactory acf2 = null;
+		DefaultMessageListenerContainer container1 = null;
+		DefaultMessageListenerContainer container2 = null;
+		
+		try {
+			brokerService1 = createBrokerService("broker1", URL1, URL2);
+			brokerService1.start();
+			brokerService2 = createBrokerService("broker2", URL2, URL1);
+			brokerService2.start();
+
+			acf1 = createConnectionFactory(URL1);
+			acf2 = createConnectionFactory(URL2);
+
+			Thread.sleep(1000);
+
+			doneLatch = new CountDownLatch(NUM_MESSAGE_TO_SEND*MAX_PRODUCERS);
+
+			container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME);
+			container1.afterPropertiesSet();
+			container2 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(30000), QUEUE2_NAME);
+			container2.afterPropertiesSet();
+
+			final ExecutorService executor = Executors.newCachedThreadPool();
+			for (int i = 0; i < MAX_PRODUCERS; i++) {
+				executor.submit(new NonPooledProducerTask(acf1, QUEUE2_NAME));
+				Thread.sleep(1000);
+				executor.submit(new NonPooledProducerTask(acf1, QUEUE1_NAME));
+			}
+
+			assertTrue(doneLatch.await(20, TimeUnit.SECONDS));			
+			executor.shutdownNow();
+
+			Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND, messageCount.get());
+		} finally {
+
+			container1.stop();
+			container1.destroy();
+			container1 = null;
+			
+			container2.stop();
+			container2.destroy();
+			container2 = null;
+
+			brokerService1.stop();
+			brokerService1 = null;
+			brokerService2.stop();
+			brokerService2 = null;
+		}
+	}
+
+
+
+
+	private BrokerService createBrokerService(final String brokerName,
+			final String uri1, final String uri2) throws Exception {
+		final BrokerService brokerService = new BrokerService();
+
+		brokerService.setBrokerName(brokerName);
+		brokerService.setPersistent(false);
+		brokerService.setUseJmx(true);
+
+		final UsageManager memoryManager = new UsageManager();
+		memoryManager.setLimit(5000000);
+		brokerService.setMemoryManager(memoryManager);
+
+		final ArrayList policyEntries = new ArrayList();
+
+		final PolicyEntry entry = new PolicyEntry();
+		entry.setQueue(">");
+		// entry.setQueue(QUEUE1_NAME);
+		entry.setMemoryLimit(1000);
+		policyEntries.add(entry);
+
+		final PolicyMap policyMap = new PolicyMap();
+		policyMap.setPolicyEntries(policyEntries);
+		brokerService.setDestinationPolicy(policyMap);
+
+		final TransportConnector tConnector = new TransportConnector();
+		tConnector.setUri(new URI(uri1));
+		tConnector.setBrokerName(brokerName);
+		tConnector.setName(brokerName + ".transportConnector");
+		brokerService.addConnector(tConnector);
+
+		if (uri2 != null) {
+			final NetworkConnector nc = new DiscoveryNetworkConnector(new URI("static:" + uri2));
+			nc.setBridgeTempDestinations(true);
+			nc.setBrokerName(brokerName);
+			nc.setName(brokerName + ".nc");
+			brokerService.addNetworkConnector(nc);
+		}
+
+		return brokerService;
+
+	}
+
+	public DefaultMessageListenerContainer createDefaultMessageListenerContainer(
+			final ConnectionFactory acf, final MessageListener listener,
+			final String queue) {
+		final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
+		container.setConnectionFactory(acf);
+		container.setDestinationName(queue);
+		container.setMessageListener(listener);
+		container.setSessionTransacted(false);
+		container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
+		container.setConcurrentConsumers(MAX_CONSUMERS);
+		return container;
+	}
+
+	public ActiveMQConnectionFactory createConnectionFactory(final String url) {
+		final ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(url);
+		acf.setCopyMessageOnSend(false);
+		acf.setUseAsyncSend(false);
+		acf.setDispatchAsync(true);
+		acf.setUseCompression(false);
+		acf.setOptimizeAcknowledge(false);
+		acf.setOptimizedMessageDispatch(true);
+		acf.setUseSyncSend(true);
+		return acf;
+	}
+
+	private class TestMessageListener1 implements MessageListener {
+
+		private final long waitTime;
+
+		public TestMessageListener1(long waitTime) {
+			this.waitTime = waitTime;
+		
+		}
+
+		public void onMessage(Message msg) {
+
+			try {
+				System.out.println("Listener1 Consumed message "+ msg.getIntProperty("count"));
+
+				messageCount.incrementAndGet();
+				doneLatch.countDown();
+				
+				Thread.sleep(waitTime);
+			} catch (JMSException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			} catch (InterruptedException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+
+		}
+	}
+
+
+	private class PooledProducerTask implements Runnable {
+
+		private final String queueName;
+
+		private final PooledConnectionFactory pcf;
+
+		public PooledProducerTask(final PooledConnectionFactory pcf,
+				final String queueName) {
+			this.pcf = pcf;
+			this.queueName = queueName;
+		}
+
+		public void run() {
+
+			try {
+
+				final JmsTemplate jmsTemplate = new JmsTemplate(pcf);
+				jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+				jmsTemplate.setExplicitQosEnabled(true);
+				jmsTemplate.setMessageIdEnabled(false);
+				jmsTemplate.setMessageTimestampEnabled(false);
+				jmsTemplate.afterPropertiesSet();
+
+				final byte[] bytes = new byte[2048];
+				final Random r = new Random();
+				r.nextBytes(bytes);
+
+				Thread.sleep(2000);
+
+				final AtomicInteger count = new AtomicInteger();
+				for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
+					jmsTemplate.send(queueName, new MessageCreator() {
+
+						public Message createMessage(Session session)
+								throws JMSException {
+
+							final BytesMessage message = session.createBytesMessage();
+
+							message.writeBytes(bytes);
+							message.setIntProperty("count", count.incrementAndGet());
+							message.setStringProperty("producer", "pooled");
+							return message;
+						}
+					});
+
+					System.out.println("PooledProducer sent message: "+ count.get());
+					// Thread.sleep(1000);
+				}
+
+			} catch (final Throwable e) {
+				System.err.println("Producer 1 is exiting.");
+				e.printStackTrace();
+			}
+		}
+	}
+	
+	
+	private class NonPooledProducerTask implements Runnable {
+
+		private final String queueName;
+
+		private final ConnectionFactory cf;
+
+		public NonPooledProducerTask(final ConnectionFactory cf,
+				final String queueName) {
+			this.cf = cf;
+			this.queueName = queueName;
+		}
+
+		public void run() {
+
+			try {
+
+				final JmsTemplate jmsTemplate = new JmsTemplate(cf);
+				jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+				jmsTemplate.setExplicitQosEnabled(true);
+				jmsTemplate.setMessageIdEnabled(false);
+				jmsTemplate.setMessageTimestampEnabled(false);
+				jmsTemplate.afterPropertiesSet();
+
+				final byte[] bytes = new byte[2048];
+				final Random r = new Random();
+				r.nextBytes(bytes);
+
+				Thread.sleep(2000);
+
+				final AtomicInteger count = new AtomicInteger();
+				for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
+					jmsTemplate.send(queueName, new MessageCreator() {
+
+						public Message createMessage(Session session)
+								throws JMSException {
+
+							final BytesMessage message = session
+									.createBytesMessage();
+
+							message.writeBytes(bytes);
+							message.setIntProperty("count", count
+									.incrementAndGet());
+							message.setStringProperty("producer", "non-pooled");
+							return message;
+						}
+					});
+
+					System.out.println("Non-PooledProducer sent message: " + count.get());
+
+					// Thread.sleep(1000);
+				}
+
+			} catch (final Throwable e) {
+				System.err.println("Producer 1 is exiting.");
+				e.printStackTrace();
+			}
+		}
+	}
+
+}

Propchange: activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java?view=diff&rev=515927&r1=515926&r2=515927
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java (original)
+++ activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java Wed Mar  7 21:24:44 2007
@@ -1,161 +1,161 @@
-package org.apache.activemq;
-
-import java.io.IOException;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.transport.tcp.TcpTransport;
-
-import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
-
-public class ProducerFlowControlTest extends JmsTestSupport {
-	
-	ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
-	ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B");
-	private TransportConnector connector;
-	private ActiveMQConnection connection;
-
-    public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws Exception {
-        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory();
-        factory.setUseSyncSend(true);
-        connection = (ActiveMQConnection) factory.createConnection();
-        connections.add(connection);
-    	connection.start();
-
-    	// Test sending to Queue A
-    	// 1st send should not block.
-    	fillQueue(queueA);
-    	
-    	Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-    	MessageConsumer consumer = session.createConsumer(queueB);
-
-    	// Test sending to Queue B it should block. 
-    	// Since even though  the it's queue limits have not been reached, the connection
-    	// is blocked.
-    	CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
-    	assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
-    	
-    	TextMessage msg = (TextMessage) consumer.receive();
-    	assertEquals("Message 1", msg.getText());
-    	msg.acknowledge();
-    	
-    	pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2");
-    	assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
-    	
-    	msg = (TextMessage) consumer.receive();
-    	assertEquals("Message 2", msg.getText());
-    	msg.acknowledge();
-    }
-
-    public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws Exception {
-        ConnectionFactory factory = createConnectionFactory();
-        connection = (ActiveMQConnection) factory.createConnection();
-        connections.add(connection);
-    	connection.start();
-
-    	// Test sending to Queue A
-    	// 1st send should not block.
-    	fillQueue(queueA);
-
-    	// Test sending to Queue B it should block. 
-    	// Since even though  the it's queue limits have not been reached, the connection
-    	// is blocked.
-    	CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
-    	assertFalse( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );    	
-    }
-
-
-	private void fillQueue(final ActiveMQQueue queue) throws JMSException, InterruptedException {
-		final AtomicBoolean done = new AtomicBoolean(true);
-		final AtomicBoolean keepGoing = new AtomicBoolean(true);
-		
-		// Starts an async thread that every time it publishes it sets the done flag to false.
-		// Once the send starts to block it will not reset the done flag anymore.
-		new Thread("Fill thread.") {
-			public void run() {
-				Session session=null;
-		    	try {
-					session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-					MessageProducer producer = session.createProducer(queue);
-					producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-					while( keepGoing.get() ) {
-						done.set(false);
-						producer.send(session.createTextMessage("Hello World"));						
-					}
-				} catch (JMSException e) {
-				} finally {
-					safeClose(session);
-				}
-			}
-		}.start();
-		
-		while( true ) {
-			Thread.sleep(1000);
-			// the producer is blocked once the done flag stays true.
-			if( done.get() )
-				break;
-			done.set(true);
-		}		
-		keepGoing.set(false);
-	}
-
-	private CountDownLatch asyncSendTo(final ActiveMQQueue queue, final String message) throws JMSException {
-		final CountDownLatch done = new CountDownLatch(1);
-		new Thread("Send thread.") {
-			public void run() {
-				Session session=null;
-		    	try {
-					session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-					MessageProducer producer = session.createProducer(queue);
-					producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-					producer.send(session.createTextMessage(message));
-					done.countDown();
-				} catch (JMSException e) {
-				} finally {
-					safeClose(session);
-				}
-			}
-		}.start();    	
-		return done;
-	}
-
-    protected BrokerService createBroker() throws Exception {
-        BrokerService service = new BrokerService();
-        service.setPersistent(false);
-        service.setUseJmx(true);
-        
-        // Setup a destination policy where it takes only 1 message at a time.
-        PolicyMap policyMap = new PolicyMap();        
-        PolicyEntry policy = new PolicyEntry();
-        policy.setMemoryLimit(1);        
-        policyMap.setDefaultEntry(policy);        
-        service.setDestinationPolicy(policyMap);
-        
-        connector = service.addConnector("tcp://localhost:0");        
-        return service;
-    }
-    
-    protected void tearDown() throws Exception {
-    	TcpTransport t = (TcpTransport) connection.getTransport().narrow(TcpTransport.class);
-    	t.getTransportListener().onException(new IOException("Disposed."));
-    	connection.getTransport().stop();
-    	super.tearDown();
-    }
-    
-    protected ConnectionFactory createConnectionFactory() throws Exception {
-        return new ActiveMQConnectionFactory(connector.getConnectUri());
-    }
-}
+package org.apache.activemq;
+
+import java.io.IOException;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.transport.tcp.TcpTransport;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+public class ProducerFlowControlTest extends JmsTestSupport {
+	
+	ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
+	ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B");
+	private TransportConnector connector;
+	private ActiveMQConnection connection;
+
+    public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory();
+        factory.setUseSyncSend(true);
+        connection = (ActiveMQConnection) factory.createConnection();
+        connections.add(connection);
+    	connection.start();
+
+    	// Test sending to Queue A
+    	// 1st send should not block.
+    	fillQueue(queueA);
+    	
+    	Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+    	MessageConsumer consumer = session.createConsumer(queueB);
+
+    	// Test sending to Queue B it should block. 
+    	// Since even though  the it's queue limits have not been reached, the connection
+    	// is blocked.
+    	CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
+    	assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
+    	
+    	TextMessage msg = (TextMessage) consumer.receive();
+    	assertEquals("Message 1", msg.getText());
+    	msg.acknowledge();
+    	
+    	pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2");
+    	assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
+    	
+    	msg = (TextMessage) consumer.receive();
+    	assertEquals("Message 2", msg.getText());
+    	msg.acknowledge();
+    }
+
+    public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws Exception {
+        ConnectionFactory factory = createConnectionFactory();
+        connection = (ActiveMQConnection) factory.createConnection();
+        connections.add(connection);
+    	connection.start();
+
+    	// Test sending to Queue A
+    	// 1st send should not block.
+    	fillQueue(queueA);
+
+    	// Test sending to Queue B it should block. 
+    	// Since even though  the it's queue limits have not been reached, the connection
+    	// is blocked.
+    	CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
+    	assertFalse( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );    	
+    }
+
+
+	private void fillQueue(final ActiveMQQueue queue) throws JMSException, InterruptedException {
+		final AtomicBoolean done = new AtomicBoolean(true);
+		final AtomicBoolean keepGoing = new AtomicBoolean(true);
+		
+		// Starts an async thread that every time it publishes it sets the done flag to false.
+		// Once the send starts to block it will not reset the done flag anymore.
+		new Thread("Fill thread.") {
+			public void run() {
+				Session session=null;
+		    	try {
+					session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+					MessageProducer producer = session.createProducer(queue);
+					producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+					while( keepGoing.get() ) {
+						done.set(false);
+						producer.send(session.createTextMessage("Hello World"));						
+					}
+				} catch (JMSException e) {
+				} finally {
+					safeClose(session);
+				}
+			}
+		}.start();
+		
+		while( true ) {
+			Thread.sleep(1000);
+			// the producer is blocked once the done flag stays true.
+			if( done.get() )
+				break;
+			done.set(true);
+		}		
+		keepGoing.set(false);
+	}
+
+	private CountDownLatch asyncSendTo(final ActiveMQQueue queue, final String message) throws JMSException {
+		final CountDownLatch done = new CountDownLatch(1);
+		new Thread("Send thread.") {
+			public void run() {
+				Session session=null;
+		    	try {
+					session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+					MessageProducer producer = session.createProducer(queue);
+					producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+					producer.send(session.createTextMessage(message));
+					done.countDown();
+				} catch (JMSException e) {
+				} finally {
+					safeClose(session);
+				}
+			}
+		}.start();    	
+		return done;
+	}
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService service = new BrokerService();
+        service.setPersistent(false);
+        service.setUseJmx(true);
+        
+        // Setup a destination policy where it takes only 1 message at a time.
+        PolicyMap policyMap = new PolicyMap();        
+        PolicyEntry policy = new PolicyEntry();
+        policy.setMemoryLimit(1);        
+        policyMap.setDefaultEntry(policy);        
+        service.setDestinationPolicy(policyMap);
+        
+        connector = service.addConnector("tcp://localhost:0");        
+        return service;
+    }
+    
+    protected void tearDown() throws Exception {
+    	TcpTransport t = (TcpTransport) connection.getTransport().narrow(TcpTransport.class);
+    	t.getTransportListener().onException(new IOException("Disposed."));
+    	connection.getTransport().stop();
+    	super.tearDown();
+    }
+    
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(connector.getConnectUri());
+    }
+}

Propchange: activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/TimeStampTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native