You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/01/02 18:45:19 UTC

svn commit: r1427878 - in /activemq/trunk/activemq-core/src/test/java/org/apache/activemq: broker/region/cursors/NegativeQueueTest.java bugs/AMQ2513Test.java usecases/ExpiredMessagesWithNoConsumerTest.java

Author: tabish
Date: Wed Jan  2 17:45:19 2013
New Revision: 1427878

URL: http://svn.apache.org/viewvc?rev=1427878&view=rev
Log:
fix test cases after changes in https://issues.apache.org/jira/browse/AMQ-4237 broker the tests queue MBean lookup

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java?rev=1427878&r1=1427877&r2=1427878&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java Wed Jan  2 17:45:19 2013
@@ -55,102 +55,102 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Modified CursorSupport Unit test to reproduce the negative queue issue.
- * 
+ *
  * Keys to reproducing:
  * 1) Consecutive queues with listener on first sending to second queue
  * 2) Push each queue to the memory limit
  *      This seems to help reproduce the issue more consistently, but
  *      we have seen times in our production environment where the
  *      negative queue can occur without. Our memory limits are
- *      very high in production and it still happens in varying 
+ *      very high in production and it still happens in varying
  *      frequency.
  * 3) Prefetch
- *      Lowering the prefetch down to 10 and below seems to help 
- *      reduce occurrences. 
+ *      Lowering the prefetch down to 10 and below seems to help
+ *      reduce occurrences.
  * 4) # of consumers per queue
  *      The issue occurs less with fewer consumers
- * 
+ *
  * Things that do not affect reproduction:
  * 1) Spring - we use spring in our production applications, but this test case works
  *      with or without it.
  * 2) transacted
- * 
+ *
  */
 public class NegativeQueueTest extends AutoFailTestSupport {
     private static final Logger LOG = LoggerFactory.getLogger(NegativeQueueTest.class);
-    
+
     public static SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd,hh:mm:ss:SSS");
-    
+
     private static final String QUEUE_1_NAME = "conn.test.queue.1";
     private static final String QUEUE_2_NAME = "conn.test.queue.2";
-    
+
     private static final long QUEUE_MEMORY_LIMIT = 2097152;
     private static final long MEMORY_USAGE = 400000000;
     private static final long TEMP_USAGE = 200000000;
     private static final long STORE_USAGE = 1000000000;
     private static final int MESSAGE_COUNT = 1100;
-    
+
     protected static final boolean TRANSACTED = true;
     protected static final boolean DEBUG = true;
-    protected static int NUM_CONSUMERS = 20;    
-    protected static int PREFETCH_SIZE = 1000;  
-    
+    protected static int NUM_CONSUMERS = 20;
+    protected static int PREFETCH_SIZE = 1000;
+
     protected BrokerService broker;
     protected String bindAddress = "tcp://localhost:0";
-    
+
     public void testWithDefaultPrefetch() throws Exception{
         PREFETCH_SIZE = 1000;
         NUM_CONSUMERS = 20;
         blastAndConsume();
     }
-    
+
     public void x_testWithDefaultPrefetchFiveConsumers() throws Exception{
         PREFETCH_SIZE = 1000;
         NUM_CONSUMERS = 5;
         blastAndConsume();
     }
-    
+
     public void x_testWithDefaultPrefetchTwoConsumers() throws Exception{
         PREFETCH_SIZE = 1000;
         NUM_CONSUMERS = 2;
         blastAndConsume();
     }
-    
+
     public void testWithDefaultPrefetchOneConsumer() throws Exception{
         PREFETCH_SIZE = 1000;
         NUM_CONSUMERS = 1;
         blastAndConsume();
     }
-    
+
     public void testWithMediumPrefetch() throws Exception{
         PREFETCH_SIZE = 50;
         NUM_CONSUMERS = 20;
         blastAndConsume();
-    }   
-    
+    }
+
     public void x_testWithSmallPrefetch() throws Exception{
         PREFETCH_SIZE = 10;
         NUM_CONSUMERS = 20;
         blastAndConsume();
     }
-    
+
     public void testWithNoPrefetch() throws Exception{
         PREFETCH_SIZE = 1;
         NUM_CONSUMERS = 20;
         blastAndConsume();
     }
-    
+
     public void blastAndConsume() throws Exception {
         LOG.info(getName());
         ConnectionFactory factory = createConnectionFactory();
-        
+
         //get proxy queues for statistics lookups
         Connection proxyConnection = factory.createConnection();
         proxyConnection.start();
         Session proxySession = proxyConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        final QueueViewMBean proxyQueue1 = getProxyToQueueViewMBean((Queue)proxySession.createQueue(QUEUE_1_NAME));
-        final QueueViewMBean proxyQueue2 = getProxyToQueueViewMBean((Queue)proxySession.createQueue(QUEUE_2_NAME));
-        
+        final QueueViewMBean proxyQueue1 = getProxyToQueueViewMBean(proxySession.createQueue(QUEUE_1_NAME));
+        final QueueViewMBean proxyQueue2 = getProxyToQueueViewMBean(proxySession.createQueue(QUEUE_2_NAME));
+
         // LOAD THE QUEUE
         Connection producerConnection = factory.createConnection();
         producerConnection.start();
@@ -168,7 +168,7 @@ public class NegativeQueueTest extends A
                 System.out.print(index-((index/10)*10));
             }
         }
-        
+
         //get access to the Queue info
         if(DEBUG){
             System.out.println("");
@@ -176,16 +176,16 @@ public class NegativeQueueTest extends A
             System.out.println("Queue1 Memory % Used = "+proxyQueue1.getMemoryPercentUsage());
             System.out.println("Queue1 Memory Available = "+proxyQueue1.getMemoryLimit());
         }
-        
+
         // FLUSH THE QUEUE
         final CountDownLatch latch1 = new CountDownLatch(1);
-        final CountDownLatch latch2 = new CountDownLatch(1);        
+        final CountDownLatch latch2 = new CountDownLatch(1);
         Connection[] consumerConnections1 = new Connection[NUM_CONSUMERS];
         List<Message> consumerList1 = new ArrayList<Message>();
         Connection[] consumerConnections2 = new Connection[NUM_CONSUMERS];
         Connection[] producerConnections2 = new Connection[NUM_CONSUMERS];
         List<Message> consumerList2 = new ArrayList<Message>();
-        
+
         for(int ix=0; ix<NUM_CONSUMERS; ix++){
             producerConnections2[ix] = factory.createConnection();
             producerConnections2[ix].start();
@@ -194,7 +194,7 @@ public class NegativeQueueTest extends A
             MessageConsumer consumer = consumerSession.createConsumer(session.createQueue(QUEUE_1_NAME));
             consumer.setMessageListener(new SessionAwareMessageListener(producerConnections2[ix], consumerSession, QUEUE_2_NAME, latch1, consumerList1));
         }
-        
+
         latch1.await(200000, TimeUnit.MILLISECONDS);
         if(DEBUG){
             System.out.println("");
@@ -202,15 +202,16 @@ public class NegativeQueueTest extends A
             System.out.println("Queue2 Memory % Used = "+proxyQueue2.getMemoryPercentUsage());
             System.out.println("Queue2 Memory Available = "+proxyQueue2.getMemoryLimit());
         }
-        
+
         for(int ix=0; ix<NUM_CONSUMERS; ix++){
             consumerConnections2[ix] = getConsumerConnection(factory);
             Session consumerSession = consumerConnections2[ix].createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE);
             MessageConsumer consumer = consumerSession.createConsumer(session.createQueue(QUEUE_2_NAME));
             consumer.setMessageListener(new SessionAwareMessageListener(consumerSession, latch2, consumerList2));
         }
-        
+
         boolean success = Wait.waitFor(new Wait.Condition() {
+            @Override
             public boolean isSatisified() throws Exception {
                 boolean done = latch2.await(10, TimeUnit.SECONDS);
                 if(DEBUG){
@@ -235,10 +236,10 @@ public class NegativeQueueTest extends A
             consumerConnections2[ix].close();
             producerConnections2[ix].close();
         }
-        
+
         //let the consumer statistics on queue2 have time to update
         Thread.sleep(500);
-        
+
         if(DEBUG){
             System.out.println("");
             System.out.println("Queue1 Size = "+proxyQueue1.getQueueSize());
@@ -248,37 +249,39 @@ public class NegativeQueueTest extends A
         }
 
         Wait.waitFor(new Wait.Condition() {
+            @Override
             public boolean isSatisified() throws Exception {
                 return 0 == proxyQueue1.getQueueSize();
             }});
         assertEquals("Queue1 has gone negative,",0, proxyQueue1.getQueueSize());
-        
+
         Wait.waitFor(new Wait.Condition() {
+            @Override
             public boolean isSatisified() throws Exception {
                 return 0 == proxyQueue2.getQueueSize();
             }});
         assertEquals("Queue2 has gone negative,",0, proxyQueue2.getQueueSize());
         proxyConnection.close();
-        
+
     }
 
-    private QueueViewMBean getProxyToQueueViewMBean(Queue queue)
-        throws MalformedObjectNameException, JMSException {
-        
-        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq" + ":Type=Queue,Destination=" + 
-            queue.getQueueName() + ",BrokerName=localhost");
-        QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName,
-                QueueViewMBean.class, true);
-     
+    private QueueViewMBean getProxyToQueueViewMBean(Queue queue) throws MalformedObjectNameException, JMSException {
+        final String prefix = "org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=";
+
+        ObjectName queueViewMBeanName = new ObjectName(prefix + queue.getQueueName());
+        QueueViewMBean proxy = (QueueViewMBean)
+            broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+
         return proxy;
     }
-    
+
    protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException {
         Connection connection = fac.createConnection();
         connection.start();
         return connection;
     }
 
+    @Override
     protected void setUp() throws Exception {
         if (broker == null) {
             broker = createBroker();
@@ -286,6 +289,7 @@ public class NegativeQueueTest extends A
         super.setUp();
     }
 
+    @Override
     protected void tearDown() throws Exception {
         super.tearDown();
         if (broker != null) {
@@ -312,18 +316,18 @@ public class NegativeQueueTest extends A
         bindAddress = answer.getTransportConnectors().get(0).getConnectUri().toString();
         return answer;
     }
-    
+
     protected void configureBroker(BrokerService answer) throws Exception {
         PolicyEntry policy = new PolicyEntry();
-        policy.setMemoryLimit(QUEUE_MEMORY_LIMIT); 
+        policy.setMemoryLimit(QUEUE_MEMORY_LIMIT);
         policy.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy());
-        
+
         // disable the cache to be sure setBatch is the problem
         // will get lots of duplicates
         // real problem is sync between cursor and store add - leads to out or order messages
         // in the cursor so setBatch can break.
         // policy.setUseCache(false);
-        
+
         PolicyMap pMap = new PolicyMap();
         pMap.setDefaultEntry(policy);
         answer.setDestinationPolicy(pMap);
@@ -331,14 +335,14 @@ public class NegativeQueueTest extends A
         answer.addConnector("tcp://localhost:0");
 
         MemoryUsage memoryUsage = new MemoryUsage();
-        memoryUsage.setLimit(MEMORY_USAGE); 
+        memoryUsage.setLimit(MEMORY_USAGE);
         memoryUsage.setPercentUsageMinDelta(20);
 
         TempUsage tempUsage = new TempUsage();
-        tempUsage.setLimit(TEMP_USAGE); 
+        tempUsage.setLimit(TEMP_USAGE);
 
         StoreUsage storeUsage = new StoreUsage();
-        storeUsage.setLimit(STORE_USAGE); 
+        storeUsage.setLimit(STORE_USAGE);
 
         SystemUsage systemUsage = new SystemUsage();
         systemUsage.setMemoryUsage(memoryUsage);
@@ -346,27 +350,27 @@ public class NegativeQueueTest extends A
         systemUsage.setStoreUsage(storeUsage);
         answer.setSystemUsage(systemUsage);
     }
-    
+
     /**
      * Message listener that is given the Session for transacted consumers
      */
     class SessionAwareMessageListener implements MessageListener{
-        private List<Message> consumerList;
-        private CountDownLatch latch;
-        private Session consumerSession;
+        private final List<Message> consumerList;
+        private final CountDownLatch latch;
+        private final Session consumerSession;
         private Session producerSession;
         private MessageProducer producer;
-        
+
         public SessionAwareMessageListener(Session consumerSession, CountDownLatch latch, List<Message> consumerList){
             this(null, consumerSession, null, latch, consumerList);
         }
-        
-        public SessionAwareMessageListener(Connection producerConnection, Session consumerSession, String outQueueName, 
+
+        public SessionAwareMessageListener(Connection producerConnection, Session consumerSession, String outQueueName,
                 CountDownLatch latch, List<Message> consumerList){
             this.consumerList = consumerList;
             this.latch = latch;
             this.consumerSession = consumerSession;
-            
+
             if(producerConnection != null){
                 try {
                     producerSession = producerConnection.createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE);
@@ -377,7 +381,8 @@ public class NegativeQueueTest extends A
                 }
             }
         }
-        
+
+        @Override
         public void onMessage(Message msg) {
             try {
                 if(producer == null){
@@ -392,7 +397,7 @@ public class NegativeQueueTest extends A
             } catch (Exception e) {
                 e.printStackTrace();
             }
-            
+
             synchronized(consumerList){
                 consumerList.add(msg);
                 if(DEBUG && consumerList.size()%100 == 0) {
@@ -411,5 +416,5 @@ public class NegativeQueueTest extends A
                 }
             }
         }
-    }    
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java?rev=1427878&r1=1427877&r2=1427878&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java Wed Jan  2 17:45:19 2013
@@ -98,7 +98,8 @@ public class AMQ2513Test extends TestCas
 
     DestinationViewMBean createView() throws Exception {
         String domain = "org.apache.activemq";
-        ObjectName name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=test");
+        ObjectName name = new ObjectName(domain + ":type=Broker,brokerName=localhost," +
+                                                  "destinationType=Queue,destinationName=test");
         return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class,
                 true);
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?rev=1427878&r1=1427877&r2=1427878&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java Wed Jan  2 17:45:19 2013
@@ -163,10 +163,6 @@ public class ExpiredMessagesWithNoConsum
 
         TimeUnit.SECONDS.sleep(5);
 
-        for (ObjectName name : broker.getAdminView().getQueues()) {
-            LOG.info("Broker Queue: {}", name);
-        }
-
         final DestinationViewMBean view = createView(destination);
         Wait.waitFor(new Wait.Condition() {
             @Override
@@ -581,8 +577,6 @@ public class ExpiredMessagesWithNoConsum
             name = new ObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test");
         }
 
-        LOG.info("Attempting to find Queue named: {}", name);
-
         return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true);
     }