You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/07/17 14:10:26 UTC

svn commit: r1362462 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/Queue.java test/java/org/apache/activemq/JmsQueueBrowserTest.java

Author: gtully
Date: Tue Jul 17 12:10:25 2012
New Revision: 1362462

URL: http://svn.apache.org/viewvc?rev=1362462&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3935 - JConsole browse() function does not work if useCache=false. Add test case and a few other browse variants with useCache=false. Fix ensures a force pagein is done if usecache is false

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1362462&r1=1362461&r2=1362462&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Jul 17 12:10:25 2012
@@ -1008,7 +1008,7 @@ public class Queue extends BaseDestinati
     public void doBrowse(List<Message> browseList, int max) {
         final ConnectionContext connectionContext = createConnectionContext();
         try {
-            pageInMessages(false);
+            pageInMessages(!isUseCache());
             List<MessageReference> toExpire = new ArrayList<MessageReference>();
 
             pagedInPendingDispatchLock.writeLock().lock();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java?rev=1362462&r1=1362461&r2=1362462&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java Tue Jul 17 12:10:25 2012
@@ -19,7 +19,6 @@ package org.apache.activemq;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
-
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -27,16 +26,16 @@ import javax.jms.Queue;
 import javax.jms.QueueBrowser;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-
-import org.apache.activemq.broker.StubConnection;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+import junit.framework.Test;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.broker.region.BaseDestination;
-import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.SessionInfo;
 
 /**
  * 
@@ -44,7 +43,11 @@ import org.apache.activemq.command.Sessi
 public class JmsQueueBrowserTest extends JmsTestSupport {
     private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory
             .getLog(JmsQueueBrowserTest.class);
-    
+    public boolean isUseCache = false;
+
+    public static Test suite() throws Exception {
+        return suite(JmsQueueBrowserTest.class);
+    }
 
     /**
      * Tests the queue browser. Browses the messages then the consumer tries to receive them. The messages should still
@@ -105,13 +108,178 @@ public class JmsQueueBrowserTest extends
         consumer.close();
 
     }
-    
+
+    public void initCombosForTestBatchSendBrowseReceive() {
+        addCombinationValues("isUseCache", new Boolean[]{Boolean.TRUE, Boolean.FALSE});
+    }
+
+    public void testBatchSendBrowseReceive() throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+        MessageConsumer consumer = session.createConsumer(destination);
+        connection.start();
+
+        TextMessage[] outbound = new TextMessage[10];
+        for (int i=0; i<10; i++) {
+            outbound[i] = session.createTextMessage( i + " Message");
+        };
+
+        // lets consume any outstanding messages from previous test runs
+        while (consumer.receive(1000) != null) {
+        }
+        consumer.close();
+
+        for (int i=0;i<outbound.length; i++) {
+            producer.send(outbound[i]);
+        }
+
+        QueueBrowser browser = session.createBrowser((Queue) destination);
+        Enumeration enumeration = browser.getEnumeration();
+
+        for (int i=0; i<outbound.length; i++) {
+            assertTrue("should have a", enumeration.hasMoreElements());
+            assertEquals(outbound[i], enumeration.nextElement());
+        }
+        browser.close();
+
+        for (int i=0;i<outbound.length; i++) {
+            producer.send(outbound[i]);
+        }
+
+        // verify second batch is visible to browse
+        browser = session.createBrowser((Queue) destination);
+        enumeration = browser.getEnumeration();
+        for (int j=0; j<2;j++) {
+            for (int i=0; i<outbound.length; i++) {
+                assertTrue("should have a", enumeration.hasMoreElements());
+                assertEquals("j=" + j + ", i=" + i, outbound[i].getText(), ((TextMessage) enumeration.nextElement()).getText());
+            }
+        }
+        browser.close();
+
+        consumer = session.createConsumer(destination);
+        for (int i=0; i<outbound.length * 2; i++) {
+            assertNotNull("Got message: " + i, consumer.receive(2000));
+        }
+        consumer.close();
+    }
+
+
+    public void initCombosForTestBatchSendJmxBrowseReceive() {
+        addCombinationValues("isUseCache", new Boolean[]{Boolean.TRUE, Boolean.FALSE});
+    }
+
+    public void testBatchSendJmxBrowseReceive() throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+        MessageConsumer consumer = session.createConsumer(destination);
+        connection.start();
+
+        TextMessage[] outbound = new TextMessage[10];
+        for (int i=0; i<10; i++) {
+            outbound[i] = session.createTextMessage( i + " Message");
+        };
+
+        // lets consume any outstanding messages from previous test runs
+        while (consumer.receive(1000) != null) {
+        }
+        consumer.close();
+
+        for (int i=0;i<outbound.length; i++) {
+            producer.send(outbound[i]);
+        }
+
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:Type=Queue,Destination=TEST,BrokerName=localhost");
+
+        LOG.info("Create QueueView MBean...");
+        QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+
+        long concount = proxy.getConsumerCount();
+        LOG.info("Consumer Count :" + concount);
+        long messcount = proxy.getQueueSize();
+        LOG.info("current number of messages in the queue :" + messcount);
+
+        // lets browse
+        CompositeData[] compdatalist = proxy.browse();
+        if (compdatalist.length == 0) {
+            fail("There is no message in the queue:");
+        }
+        String[] messageIDs = new String[compdatalist.length];
+
+        for (int i = 0; i < compdatalist.length; i++) {
+            CompositeData cdata = compdatalist[i];
+
+            if (i == 0) {
+                LOG.info("Columns: " + cdata.getCompositeType().keySet());
+            }
+            messageIDs[i] = (String)cdata.get("JMSMessageID");
+            LOG.info("message " + i + " : " + cdata.values());
+        }
+
+        TabularData table = proxy.browseAsTable();
+        LOG.info("Found tabular data: " + table);
+        assertTrue("Table should not be empty!", table.size() > 0);
+
+        assertEquals("Queue size", outbound.length, proxy.getQueueSize());
+        assertEquals("Queue size", outbound.length, compdatalist.length);
+        assertEquals("Queue size", outbound.length, table.size());
+
+
+        LOG.info("Send another 10");
+        for (int i=0;i<outbound.length; i++) {
+            producer.send(outbound[i]);
+        }
+
+        LOG.info("Browse again");
+
+        messcount = proxy.getQueueSize();
+        LOG.info("current number of messages in the queue :" + messcount);
+
+        compdatalist = proxy.browse();
+        if (compdatalist.length == 0) {
+            fail("There is no message in the queue:");
+        }
+        messageIDs = new String[compdatalist.length];
+
+        for (int i = 0; i < compdatalist.length; i++) {
+            CompositeData cdata = compdatalist[i];
+
+            if (i == 0) {
+                LOG.info("Columns: " + cdata.getCompositeType().keySet());
+            }
+            messageIDs[i] = (String)cdata.get("JMSMessageID");
+            LOG.info("message " + i + " : " + cdata.values());
+        }
+
+        table = proxy.browseAsTable();
+        LOG.info("Found tabular data: " + table);
+        assertTrue("Table should not be empty!", table.size() > 0);
+
+        assertEquals("Queue size", outbound.length*2, proxy.getQueueSize());
+        assertEquals("Queue size", outbound.length*2, compdatalist.length);
+        assertEquals("Queue size", outbound.length * 2, table.size());
+
+        consumer = session.createConsumer(destination);
+        for (int i=0; i<outbound.length * 2; i++) {
+            assertNotNull("Got message: " + i, consumer.receive(2000));
+        }
+        consumer.close();
+    }
+
     public void testBrowseReceive() throws Exception {
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         ActiveMQQueue destination = new ActiveMQQueue("TEST");
        
         connection.start();
 
+        // create consumer
+        MessageConsumer consumer = session.createConsumer(destination);
+        // lets consume any outstanding messages from previous test runs
+        while (consumer.receive(1000) != null) {
+        }
+
         Message[] outbound = new Message[]{session.createTextMessage("First Message"),
                                            session.createTextMessage("Second Message"),
                                            session.createTextMessage("Third Message")};
@@ -123,10 +291,7 @@ public class JmsQueueBrowserTest extends
         // create browser first
         QueueBrowser browser = session.createBrowser((Queue) destination);
         Enumeration enumeration = browser.getEnumeration();
-        
-        // create consumer
-        MessageConsumer consumer = session.createConsumer(destination);
-        
+
         // browse the first message
         assertTrue("should have received the first message", enumeration.hasMoreElements());
         assertEquals(outbound[0], (Message) enumeration.nextElement());
@@ -155,6 +320,10 @@ public class JmsQueueBrowserTest extends
 
         MessageProducer producer = session.createProducer(destination);
         MessageConsumer consumer = session.createConsumer(destinationPrefetch10);
+
+        // lets consume any outstanding messages from previous test runs
+        while (consumer.receive(1000) != null) {
+        }
   
         for (int i=0; i<numMessages; i++) {
             TextMessage message = session.createTextMessage("Message: " + i);
@@ -199,6 +368,12 @@ public class JmsQueueBrowserTest extends
                                            session.createTextMessage("Third Message")};
 
 
+        // create consumer
+        MessageConsumer consumer = session.createConsumer(destination);
+        // lets consume any outstanding messages from previous test runs
+        while (consumer.receive(1000) != null) {
+        }
+
         MessageProducer producer = session.createProducer(destination);
         producer.send(outbound[0]);
         producer.send(outbound[1]);
@@ -218,8 +393,6 @@ public class JmsQueueBrowserTest extends
 
         browser.close();
 
-        // create consumer
-        MessageConsumer consumer = session.createConsumer(destination);
 
         // Receive the first message.
         TextMessage msg = (TextMessage)consumer.receive(1000);
@@ -233,4 +406,15 @@ public class JmsQueueBrowserTest extends
         producer.close();
 
     }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService brokerService = super.createBroker();
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setUseCache(isUseCache);
+        policyMap.setDefaultEntry(policyEntry);
+        brokerService.setDestinationPolicy(policyMap);
+        return brokerService;
+    }
+
 }