You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2012/11/20 12:59:48 UTC

svn commit: r1411632 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/region/ activemq-client/src/main/java/org/apache/activemq/ activemq-core/src/test/java/org/apache/activemq/broker/ activemq-core/src/test/java/org/apach...

Author: dejanb
Date: Tue Nov 20 11:59:46 2012
New Revision: 1411632

URL: http://svn.apache.org/viewvc?rev=1411632&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-4181 - jms queue browsing

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1411632&r1=1411631&r2=1411632&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Tue Nov 20 11:59:46 2012
@@ -69,14 +69,19 @@ public abstract class PrefetchSubscripti
     protected final Object dispatchLock = new Object();
     private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
 
-    public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
+    public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws JMSException {
         super(broker,context, info);
         this.usageManager=usageManager;
         pending = cursor;
+        try {
+            pending.start();
+        } catch (Exception e) {
+            throw new JMSException(e.getMessage());
+        }
         this.scheduler = broker.getScheduler();
     }
 
-    public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
+    public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException {
         this(broker,usageManager,context, info, new VMPendingMessageCursor(false));
     }
 

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1411632&r1=1411631&r2=1411632&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Nov 20 11:59:46 2012
@@ -568,6 +568,10 @@ public class Queue extends BaseDestinati
                     }
                     redeliveredWaitingDispatch.addMessageLast(qmr);
                 }
+                if (sub instanceof QueueBrowserSubscription) {
+                    ((QueueBrowserSubscription)sub).decrementQueueRef();
+                    browserDispatches.remove(sub);
+                }
                 if (!redeliveredWaitingDispatch.isEmpty()) {
                     doDispatch(new OrderedPendingList());
                 }
@@ -1460,7 +1464,6 @@ public class Queue extends BaseDestinati
                 }
             }
 
-            BrowserDispatch pendingBrowserDispatch = browserDispatches.poll();
 
             messagesLock.readLock().lock();
             try{
@@ -1481,16 +1484,18 @@ public class Queue extends BaseDestinati
             // !messages.isEmpty(), and then if
             // !pagedInPendingDispatch.isEmpty()
             // then we do a dispatch.
-            if (pageInMoreMessages || pendingBrowserDispatch != null || !redeliveredWaitingDispatch.isEmpty()) {
+            boolean hasBrowsers = browserDispatches.size() > 0;
+
+            if (pageInMoreMessages || hasBrowsers || !redeliveredWaitingDispatch.isEmpty()) {
                 try {
-                    pageInMessages(pendingBrowserDispatch != null);
+                    pageInMessages(hasBrowsers);
 
                 } catch (Throwable e) {
                     LOG.error("Failed to page in more queue messages ", e);
                 }
             }
 
-            if (pendingBrowserDispatch != null) {
+            if (hasBrowsers) {
                 ArrayList<QueueMessageReference> alreadyDispatchedMessages = null;
                 pagedInMessagesLock.readLock().lock();
                 try{
@@ -1498,30 +1503,38 @@ public class Queue extends BaseDestinati
                 }finally {
                     pagedInMessagesLock.readLock().unlock();
                 }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("dispatch to browser: " + pendingBrowserDispatch.getBrowser()
-                            + ", already dispatched/paged count: " + alreadyDispatchedMessages.size());
-                }
-                do {
+
+                Iterator<BrowserDispatch> browsers = browserDispatches.iterator();
+                while (browsers.hasNext()) {
+                    BrowserDispatch browserDispatch = browsers.next();
                     try {
                         MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
                         msgContext.setDestination(destination);
 
-                        QueueBrowserSubscription browser = pendingBrowserDispatch.getBrowser();
+                        QueueBrowserSubscription browser = browserDispatch.getBrowser();
+
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("dispatch to browser: " + browser
+                                    + ", already dispatched/paged count: " + alreadyDispatchedMessages.size());
+                        }
+                        boolean added = false;
                         for (QueueMessageReference node : alreadyDispatchedMessages) {
-                            if (!node.isAcked()) {
+                            if (!node.isAcked() && !browser.getPending().getMessageAudit().isDuplicate(node.getMessageId())) {
                                 msgContext.setMessageReference(node);
                                 if (browser.matches(node, msgContext)) {
                                     browser.add(node);
+                                    added = true;
                                 }
                             }
                         }
-                        pendingBrowserDispatch.done();
+                        // are we done browsing? no new messages paged
+                        if (!added) {
+                            browser.decrementQueueRef();
+                        }
                     } catch (Exception e) {
-                        LOG.warn("exception on dispatch to browser: " + pendingBrowserDispatch.getBrowser(), e);
+                        LOG.warn("exception on dispatch to browser: " + browserDispatch.getBrowser(), e);
                     }
-
-                } while ((pendingBrowserDispatch = browserDispatches.poll()) != null);
+                }
             }
 
             if (pendingWakeups.get() > 0) {

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java?rev=1411632&r1=1411631&r2=1411632&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java Tue Nov 20 11:59:46 2012
@@ -20,8 +20,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.filter.MessageEvaluationContext;
@@ -34,7 +37,7 @@ public class QueueBrowserSubscription ex
     boolean destinationsAdded;
 
     public QueueBrowserSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info)
-        throws InvalidSelectorException {
+        throws JMSException {
         super(broker,usageManager, context, info);
     }
 

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=1411632&r1=1411631&r2=1411632&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Tue Nov 20 11:59:46 2012
@@ -34,7 +34,7 @@ public class QueueSubscription extends P
 
     private static final Logger LOG = LoggerFactory.getLogger(QueueSubscription.class);
 
-    public QueueSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
+    public QueueSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException {
         super(broker,usageManager, context, info);
     }
 

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java?rev=1411632&r1=1411631&r2=1411632&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java Tue Nov 20 11:59:46 2012
@@ -202,6 +202,7 @@ public class ActiveMQQueueBrowser implem
     }
 
     public synchronized void close() throws JMSException {
+        browseDone.set(true);
         destroyConsumer();
         closed = true;
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java?rev=1411632&r1=1411631&r2=1411632&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java Tue Nov 20 11:59:46 2012
@@ -264,7 +264,7 @@ public class BrokerTest extends BrokerTe
             messages.add(m1);
         }
 
-        for (int i = 0; i < 1; i++) {
+        for (int i = 0; i < 4; i++) {
             Message m1 = messages.get(i);
             Message m2 = receiveMessage(connection2);
             assertNotNull("m2 is null for index: " + i, m2);

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java?rev=1411632&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java Tue Nov 20 11:59:46 2012
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+
+import java.net.URI;
+import java.util.Enumeration;
+
+import static org.junit.Assert.*;
+
+public class QueueBrowsingTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(QueueBrowsingTest.class);
+
+    private BrokerService broker;
+    private URI connectUri;
+    private ActiveMQConnectionFactory factory;
+
+
+    @Before
+    public void startBroker() throws Exception {
+        broker = new BrokerService();
+        TransportConnector connector = broker.addConnector("tcp://0.0.0.0:0");
+        broker.deleteAllMessages();
+        broker.start();
+        broker.waitUntilStarted();
+        connectUri = connector.getConnectUri();
+        factory = new ActiveMQConnectionFactory(connectUri);
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    @Test
+    public void testBrowsing() throws JMSException {
+
+        int messageToSend = 370;
+
+        ActiveMQQueue queue = new ActiveMQQueue("TEST");
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(queue);
+
+        String data = "";
+        for( int i=0; i < 1024*2; i++ ) {
+            data += "x";
+        }
+
+        for( int i=0; i < messageToSend; i++ ) {
+            producer.send(session.createTextMessage(data));
+        }
+
+        QueueBrowser browser = session.createBrowser(queue);
+        Enumeration enumeration = browser.getEnumeration();
+        int received = 0;
+        while (enumeration.hasMoreElements()) {
+            Message m = (Message) enumeration.nextElement();
+            received++;
+        }
+
+        browser.close();
+
+        assertEquals(messageToSend, received);
+    }
+
+    @Test
+    public void testBrowseConcurrent() throws Exception {
+        final int messageToSend = 370;
+
+        final ActiveMQQueue queue = new ActiveMQQueue("TEST");
+        Connection connection = factory.createConnection();
+        connection.start();
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = session.createProducer(queue);
+
+        String data = "";
+        for( int i=0; i < 1024*2; i++ ) {
+            data += "x";
+        }
+
+        for( int i=0; i < messageToSend; i++ ) {
+            producer.send(session.createTextMessage(data));
+        }
+
+        Thread browserThread = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    QueueBrowser browser = session.createBrowser(queue);
+                    Enumeration enumeration = browser.getEnumeration();
+                    int received = 0;
+                    while (enumeration.hasMoreElements()) {
+                        Message m = (Message) enumeration.nextElement();
+                        received++;
+                    }
+                    assertEquals("Browsed all messages", messageToSend, received);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+
+        browserThread.start();
+
+        Thread consumerThread = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    MessageConsumer consumer = session.createConsumer(queue);
+                    int received = 0;
+                    while (true) {
+                        Message m = (Message) consumer.receive(1000);
+                        if (m == null)
+                            break;
+                        received++;
+                    }
+                    assertEquals("Consumed all messages", messageToSend, received);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+
+        consumerThread.start();
+
+        browserThread.join();
+        consumerThread.join();
+
+    }
+
+    @Test
+    public void testMemoryLimit() throws Exception {
+        broker.getSystemUsage().getMemoryUsage().setLimit(10 * 1024);
+
+        int messageToSend = 370;
+
+        ActiveMQQueue queue = new ActiveMQQueue("TEST");
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(queue);
+
+        String data = "";
+        for( int i=0; i < 1024*2; i++ ) {
+            data += "x";
+        }
+
+        for( int i=0; i < messageToSend; i++ ) {
+            producer.send(session.createTextMessage(data));
+        }
+
+        QueueBrowser browser = session.createBrowser(queue);
+        Enumeration enumeration = browser.getEnumeration();
+        int received = 0;
+        while (enumeration.hasMoreElements()) {
+            Message m = (Message) enumeration.nextElement();
+            received++;
+            LOG.info("Browsed message " + received + ": " + m.getJMSMessageID());
+        }
+
+        browser.close();
+
+        assertEquals(3, received);
+    }
+
+
+
+}