You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/02/07 19:20:32 UTC

svn commit: r1443642 - in /activemq/trunk/activemq-spring/src/test/java/org/apache/bugs: AMQ1730Test.java AMQ2754Test.java LoadBalanceTest.java

Author: tabish
Date: Thu Feb  7 18:20:32 2013
New Revision: 1443642

URL: http://svn.apache.org/r1443642
Log:
Don't use hardcoded ports.

Modified:
    activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java
    activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java
    activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java

Modified: activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java?rev=1443642&r1=1443641&r2=1443642&view=diff
==============================================================================
--- activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java (original)
+++ activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java Thu Feb  7 18:20:32 2013
@@ -17,12 +17,7 @@
 
 package org.apache.bugs;
 
-import junit.framework.TestCase;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import java.util.concurrent.CountDownLatch;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -33,17 +28,20 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-import java.util.concurrent.CountDownLatch;
 
+import junit.framework.TestCase;
 
-public class AMQ1730Test extends TestCase {
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
 
-    private static final Logger log = LoggerFactory.getLogger(AMQ1730Test.class);
 
+public class AMQ1730Test extends TestCase {
 
+    private static final Logger log = LoggerFactory.getLogger(AMQ1730Test.class);
     private static final String JMSX_DELIVERY_COUNT = "JMSXDeliveryCount";
-
-
     BrokerService brokerService;
 
     private static final int MESSAGE_COUNT = 250;
@@ -55,6 +53,7 @@ public class AMQ1730Test extends TestCas
     @Override
     protected void setUp() throws Exception {
         super.setUp();
+
         brokerService = new BrokerService();
         brokerService.addConnector("tcp://localhost:0");
         brokerService.setUseJmx(false);
@@ -65,6 +64,7 @@ public class AMQ1730Test extends TestCas
     protected void tearDown() throws Exception {
         super.tearDown();
         brokerService.stop();
+        brokerService.waitUntilStopped();
     }
 
     public void testRedelivery() throws Exception {
@@ -109,7 +109,7 @@ public class AMQ1730Test extends TestCas
         messageListenerContainer.setSessionTransacted(false);
         messageListenerContainer.setMessageListener(new MessageListener() {
 
-
+            @Override
             public void onMessage(Message message) {
                 if (!(message instanceof TextMessage)) {
                     throw new RuntimeException();
@@ -159,7 +159,5 @@ public class AMQ1730Test extends TestCas
         T get() {
             return value;
         }
-
     }
-
 }

Modified: activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java?rev=1443642&r1=1443641&r2=1443642&view=diff
==============================================================================
--- activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java (original)
+++ activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java Thu Feb  7 18:20:32 2013
@@ -35,11 +35,11 @@ import org.apache.activemq.broker.Broker
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.network.NetworkConnector;
 import org.apache.activemq.pool.PooledConnectionFactory;
-//import org.apache.activemq.pool.PooledConnectionFactory;
 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.core.MessageCreator;
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
+//import org.apache.activemq.pool.PooledConnectionFactory;
 
 public class AMQ2754Test extends TestCase {
 
@@ -47,110 +47,117 @@ public class AMQ2754Test extends TestCas
         BrokerService brokerService1 = null;
         BrokerService brokerService2 = null;
 
+        String broker1Uri;
+        String broker2Uri;
+
         final int total = 100;
         final CountDownLatch latch = new CountDownLatch(total);
         final boolean conduitSubscriptions = true;
         try {
 
-        {
-            brokerService1 = new BrokerService();
-            brokerService1.setBrokerName("consumer");
-            brokerService1.setUseJmx(false);
-            brokerService1.setPersistenceAdapter(new MemoryPersistenceAdapter());
-            brokerService1.addConnector("tcp://0.0.0.0:61616");
-            brokerService1.start();
-        }
+            {
+                brokerService1 = new BrokerService();
+                brokerService1.setBrokerName("consumer");
+                brokerService1.setUseJmx(false);
+                brokerService1.setPersistenceAdapter(new MemoryPersistenceAdapter());
+                broker1Uri = brokerService1.addConnector("tcp://0.0.0.0:0").getPublishableConnectString();
+                brokerService1.start();
+            }
 
-        {
-            brokerService2 = new BrokerService();
-            brokerService2.setBrokerName("producer");
-            brokerService2.setUseJmx(false);
-            brokerService2.setPersistenceAdapter(new MemoryPersistenceAdapter());
-            brokerService2.addConnector("tcp://0.0.0.0:51515");
-            NetworkConnector network2 = brokerService2.addNetworkConnector("static:(tcp://localhost:61616)");
-            network2.setName("network1");
-            network2.setDynamicOnly(true);
-            network2.setConduitSubscriptions(conduitSubscriptions);
-            network2.setNetworkTTL(3);
-            network2.setPrefetchSize(1);
-            brokerService2.start();
-        }
+            {
+                brokerService2 = new BrokerService();
+                brokerService2.setBrokerName("producer");
+                brokerService2.setUseJmx(false);
+                brokerService2.setPersistenceAdapter(new MemoryPersistenceAdapter());
+                broker2Uri = brokerService2.addConnector("tcp://0.0.0.0:0").getPublishableConnectString();
+                NetworkConnector network2 = brokerService2.addNetworkConnector("static:("+broker1Uri+")");
+                network2.setName("network1");
+                network2.setDynamicOnly(true);
+                network2.setConduitSubscriptions(conduitSubscriptions);
+                network2.setNetworkTTL(3);
+                network2.setPrefetchSize(1);
+                brokerService2.start();
+            }
 
-        ExecutorService pool = Executors.newSingleThreadExecutor();
+            ExecutorService pool = Executors.newSingleThreadExecutor();
 
-        ActiveMQConnectionFactory connectionFactory1 = 
-            new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)");
+            ActiveMQConnectionFactory connectionFactory1 =
+                new ActiveMQConnectionFactory("failover:("+broker1Uri+")");
 
-        connectionFactory1.setWatchTopicAdvisories(false);
-        final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
-        container.setConnectionFactory(connectionFactory1);
-        container.setMaxConcurrentConsumers(10);
-        container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
-        container.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
-        container.setDestination(new ActiveMQQueue("testingqueue"));
-        container.setMessageListener(new MessageListener() {
-            public void onMessage(Message message) {
-                latch.countDown();
-            }
-        });
-        container.setMaxMessagesPerTask(1);
-        container.afterPropertiesSet();
-        container.start();
-
-        pool.submit(new Callable<Object>() {
-            public Object call() throws Exception {
-                try {
-                    final int batch = 10;
-                    ActiveMQConnectionFactory connectionFactory2 = 
-                        new ActiveMQConnectionFactory("failover:(tcp://localhost:51515)");
-                    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(connectionFactory2);
-                    connectionFactory2.setWatchTopicAdvisories(false);
-                    JmsTemplate template = new JmsTemplate(pooledConnectionFactory);
-                    ActiveMQQueue queue = new ActiveMQQueue("testingqueue");
-                    for(int b = 0; b < batch; b++) {
-                        for(int i = 0; i < (total / batch); i++) {
-                            final String id = ":batch=" + b + "i=" + i;
-                            template.send(queue, new MessageCreator() {
-                                public Message createMessage(Session session) throws JMSException {
-                                    TextMessage message = session.createTextMessage();
-                                    message.setText("Hello World!" + id);
-                                    return message;
-                                }
-                            });
-                        }
-                        // give spring time to scale back again
-                        while(container.getActiveConsumerCount() > 1) {
-                            System.out.println("active consumer count:" + container.getActiveConsumerCount());
-                            System.out.println("concurrent consumer count: " + container.getConcurrentConsumers());
-                            Thread.sleep(1000);
+            connectionFactory1.setWatchTopicAdvisories(false);
+            final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
+            container.setConnectionFactory(connectionFactory1);
+            container.setMaxConcurrentConsumers(10);
+            container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
+            container.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
+            container.setDestination(new ActiveMQQueue("testingqueue"));
+            container.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    latch.countDown();
+                }
+            });
+            container.setMaxMessagesPerTask(1);
+            container.afterPropertiesSet();
+            container.start();
+
+            final String finalBroker2Uri = broker2Uri;
+
+            pool.submit(new Callable<Object>() {
+                @Override
+                public Object call() throws Exception {
+                    try {
+                        final int batch = 10;
+                        ActiveMQConnectionFactory connectionFactory2 =
+                            new ActiveMQConnectionFactory("failover:("+finalBroker2Uri+")");
+                        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(connectionFactory2);
+                        connectionFactory2.setWatchTopicAdvisories(false);
+                        JmsTemplate template = new JmsTemplate(pooledConnectionFactory);
+                        ActiveMQQueue queue = new ActiveMQQueue("testingqueue");
+                        for(int b = 0; b < batch; b++) {
+                            for(int i = 0; i < (total / batch); i++) {
+                                final String id = ":batch=" + b + "i=" + i;
+                                template.send(queue, new MessageCreator() {
+                                    @Override
+                                    public Message createMessage(Session session) throws JMSException {
+                                        TextMessage message = session.createTextMessage();
+                                        message.setText("Hello World!" + id);
+                                        return message;
+                                    }
+                                });
+                            }
+                            // give spring time to scale back again
+                            while(container.getActiveConsumerCount() > 1) {
+                                System.out.println("active consumer count:" + container.getActiveConsumerCount());
+                                System.out.println("concurrent consumer count: " + container.getConcurrentConsumers());
+                                Thread.sleep(1000);
+                            }
                         }
+                        //pooledConnectionFactory.stop();
+                    } catch(Throwable t) {
+                        t.printStackTrace();
                     }
-                    //pooledConnectionFactory.stop();
-                } catch(Throwable t) {
-                    t.printStackTrace();
+                    return null;
                 }
-                return null;
-            }
-        });
-
-        pool.shutdown();
-        pool.awaitTermination(10, TimeUnit.SECONDS);
+            });
 
-        int count = 0;
+            pool.shutdown();
+            pool.awaitTermination(10, TimeUnit.SECONDS);
 
-        // give it 20 seconds
-        while(!latch.await(1, TimeUnit.SECONDS) && count++ < 20) {
-            System.out.println("count " + latch.getCount());
-        }
+            int count = 0;
 
+            // give it 20 seconds
+            while(!latch.await(1, TimeUnit.SECONDS) && count++ < 20) {
+                System.out.println("count " + latch.getCount());
+            }
 
-        container.destroy();
+            container.destroy();
 
         } finally {
-            try { if(brokerService1 != null) { 
+            try { if(brokerService1 != null) {
                 brokerService1.stop();
             }} catch(Throwable t) { t.printStackTrace(); }
-            try { if(brokerService2 != null) { 
+            try { if(brokerService2 != null) {
                 brokerService2.stop();
             }} catch(Throwable t) { t.printStackTrace(); }
         }
@@ -158,7 +165,5 @@ public class AMQ2754Test extends TestCas
         if(latch.getCount() > 0) {
             fail("latch should have gone down to 0 but was " + latch.getCount());
         }
-
     }
-
-} 
\ No newline at end of file
+}
\ No newline at end of file

Modified: activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java?rev=1443642&r1=1443641&r2=1443642&view=diff
==============================================================================
--- activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java (original)
+++ activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java Thu Feb  7 18:20:32 2013
@@ -39,9 +39,9 @@ import org.apache.activemq.command.Activ
 import org.apache.activemq.network.NetworkConnector;
 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
 import org.junit.Ignore;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.junit.Test;
 import org.springframework.jms.connection.CachingConnectionFactory;
 import org.springframework.jms.connection.SingleConnectionFactory;
 import org.springframework.jms.core.JmsTemplate;
@@ -61,48 +61,49 @@ public class LoadBalanceTest {
         final AtomicInteger broker1Count = new AtomicInteger(0);
         final AtomicInteger broker2Count = new AtomicInteger(0);
         final CountDownLatch startProducer = new CountDownLatch(1);
+
+        String broker1Uri;
+        String broker2Uri;
+
         try {
-            {
-                brokerService1 = new BrokerService();
-                brokerService1.setBrokerName("one");
-                brokerService1.setUseJmx(false);
-                brokerService1
-                        .setPersistenceAdapter(new MemoryPersistenceAdapter());
-                brokerService1.addConnector("nio://0.0.0.0:61616");
-                final NetworkConnector network1 = brokerService1
-                        .addNetworkConnector("static:(tcp://localhost:51515)");
-                network1.setName("network1");
-                network1.setDynamicOnly(true);
-                network1.setNetworkTTL(3);
-                network1.setPrefetchSize(networkBridgePrefetch);
-                network1.setConduitSubscriptions(false);
-                network1.setDecreaseNetworkConsumerPriority(false);
-                network1.setDispatchAsync(false);
-                brokerService1.start();
-            }
-            {
-                brokerService2 = new BrokerService();
-                brokerService2.setBrokerName("two");
-                brokerService2.setUseJmx(false);
-                brokerService2
-                        .setPersistenceAdapter(new MemoryPersistenceAdapter());
-                brokerService2.addConnector("nio://0.0.0.0:51515");
-                final NetworkConnector network2 = brokerService2
-                        .addNetworkConnector("static:(tcp://localhost:61616)");
-                network2.setName("network1");
-                network2.setDynamicOnly(true);
-                network2.setNetworkTTL(3);
-                network2.setPrefetchSize(networkBridgePrefetch);
-                network2.setConduitSubscriptions(false);
-                network2.setDecreaseNetworkConsumerPriority(false);
-                network2.setDispatchAsync(false);
-                brokerService2.start();
-            }
+
+
+            brokerService1 = new BrokerService();
+            brokerService1.setBrokerName("one");
+            brokerService1.setUseJmx(false);
+            brokerService1.setPersistenceAdapter(new MemoryPersistenceAdapter());
+            broker1Uri = brokerService1.addConnector("nio://0.0.0.0:0").getPublishableConnectString();
+
+            brokerService2 = new BrokerService();
+            brokerService2.setBrokerName("two");
+            brokerService2.setUseJmx(false);
+            brokerService2.setPersistenceAdapter(new MemoryPersistenceAdapter());
+            broker2Uri = brokerService2.addConnector("nio://0.0.0.0:0").getPublishableConnectString();
+
+            final NetworkConnector network1 = brokerService1.addNetworkConnector("static:("+broker2Uri+")");
+            network1.setName("network1");
+            network1.setDynamicOnly(true);
+            network1.setNetworkTTL(3);
+            network1.setPrefetchSize(networkBridgePrefetch);
+            network1.setConduitSubscriptions(false);
+            network1.setDecreaseNetworkConsumerPriority(false);
+            network1.setDispatchAsync(false);
+
+            final NetworkConnector network2 = brokerService2.addNetworkConnector("static:("+broker1Uri+")");
+            network2.setName("network1");
+            network2.setDynamicOnly(true);
+            network2.setNetworkTTL(3);
+            network2.setPrefetchSize(networkBridgePrefetch);
+            network2.setConduitSubscriptions(false);
+            network2.setDecreaseNetworkConsumerPriority(false);
+            network2.setDispatchAsync(false);
+
+            brokerService1.start();
+            brokerService2.start();
+
             final ExecutorService pool = Executors.newSingleThreadExecutor();
-            final ActiveMQConnectionFactory connectionFactory1 = new ActiveMQConnectionFactory(
-                    "vm://one");
-            final SingleConnectionFactory singleConnectionFactory1 = new SingleConnectionFactory(
-                    connectionFactory1);
+            final ActiveMQConnectionFactory connectionFactory1 = new ActiveMQConnectionFactory("vm://one");
+            final SingleConnectionFactory singleConnectionFactory1 = new SingleConnectionFactory(connectionFactory1);
             singleConnectionFactory1.setReconnectOnException(true);
             final DefaultMessageListenerContainer container1 = new DefaultMessageListenerContainer();
             container1.setConnectionFactory(singleConnectionFactory1);
@@ -110,6 +111,7 @@ public class LoadBalanceTest {
             container1.setDestination(new ActiveMQQueue("testingqueue"));
             container1.setMessageListener(new MessageListener() {
 
+                @Override
                 public void onMessage(final Message message) {
                     broker1Count.incrementAndGet();
                 }
@@ -118,6 +120,7 @@ public class LoadBalanceTest {
             container1.start();
             pool.submit(new Callable<Object>() {
 
+                @Override
                 public Object call() throws Exception {
                     try {
                         final ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory(
@@ -133,6 +136,7 @@ public class LoadBalanceTest {
                                 "testingqueue"));
                         container2.setMessageListener(new MessageListener() {
 
+                            @Override
                             public void onMessage(final Message message) {
                                 broker2Count.incrementAndGet();
                             }
@@ -151,6 +155,7 @@ public class LoadBalanceTest {
                         for (int i = 0; i < total; i++) {
                             template.send(queue, new MessageCreator() {
 
+                                @Override
                                 public Message createMessage(
                                         final Session session)
                                         throws JMSException {
@@ -197,6 +202,7 @@ public class LoadBalanceTest {
             try {
                 if (brokerService1 != null) {
                     brokerService1.stop();
+                    brokerService1.waitUntilStopped();
                 }
             } catch (final Throwable t) {
                 t.printStackTrace();
@@ -204,12 +210,13 @@ public class LoadBalanceTest {
             try {
                 if (brokerService2 != null) {
                     brokerService2.stop();
+                    brokerService2.waitUntilStopped();
                 }
             } catch (final Throwable t) {
                 t.printStackTrace();
             }
         }
-        
+
         if (broker1Count.get() < 25 || broker2Count.get() < 25) {
             fail("Each broker should have gotten at least 25 messages but instead broker1 got "
                     + broker1Count.get()
@@ -240,6 +247,7 @@ public class LoadBalanceTest {
         container1.setDestination(new ActiveMQQueue(TESTING_QUEUE));
         container1.setMessageListener(new MessageListener() {
 
+            @Override
             public void onMessage(final Message message) {
                 broker1Count.incrementAndGet();
             }
@@ -248,6 +256,7 @@ public class LoadBalanceTest {
         container1.start();
         pool.submit(new Callable<Object>() {
 
+            @Override
             public Object call() throws Exception {
                 System.setProperty("lbt.brokerName", "two");
                 final ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory(
@@ -261,16 +270,17 @@ public class LoadBalanceTest {
                 container2.setDestination(new ActiveMQQueue(TESTING_QUEUE));
                 container2.setMessageListener(new MessageListener() {
 
+                    @Override
                     public void onMessage(final Message message) {
                         broker2Count.incrementAndGet();
                     }
                 });
                 container2.afterPropertiesSet();
                 container2.start();
-                
-                
+
+
                 assertTrue("wait for start signal", startProducer.await(20, TimeUnit.SECONDS));
-                
+
                 final CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(
                         singleConnectionFactory2);
                 final JmsTemplate template = new JmsTemplate(
@@ -279,6 +289,7 @@ public class LoadBalanceTest {
                 for (int i = 0; i < total; i++) {
                     template.send(queue, new MessageCreator() {
 
+                        @Override
                         public Message createMessage(final Session session)
                                 throws JMSException {
                             final TextMessage message = session
@@ -291,14 +302,14 @@ public class LoadBalanceTest {
                 return null;
             }
         });
-        
+
         // give network a chance to build, needs advisories
         waitForBridgeFormation();
         startProducer.countDown();
-        
+
         pool.shutdown();
         pool.awaitTermination(10, TimeUnit.SECONDS);
-        
+
         LOG.info("broker1Count " + broker1Count.get() + ", broker2Count " + broker2Count.get());
 
         int count = 0;