You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/02/18 10:44:31 UTC

svn commit: r745456 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/broker/

Author: gtully
Date: Wed Feb 18 09:44:31 2009
New Revision: 745456

URL: http://svn.apache.org/viewvc?rev=745456&view=rev
Log:
little refactor of recovery dispatch as now only used for browser dispatch

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=745456&r1=745455&r2=745456&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Wed Feb 18 09:44:31 2009
@@ -18,6 +18,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
@@ -214,12 +215,37 @@
         }
     }
 
-    class RecoveryDispatch {
-        public ArrayList<QueueMessageReference> messages;
-        public Subscription subscription;
+    /*
+     * Holder for subscription and pagedInMessages as a browser
+     * needs access to existing messages in the queue that have
+     * already been dispatched
+     */
+    class BrowserDispatch {
+        ArrayList<QueueMessageReference> messages;
+        QueueBrowserSubscription browser;
+        
+        public BrowserDispatch(QueueBrowserSubscription browserSubscription,
+                Collection<QueueMessageReference> values) {
+            
+            messages =  new ArrayList<QueueMessageReference>(values);
+            browser = browserSubscription;
+            browser.incrementQueueRef();
+        }
+        
+        void done() {
+            try {
+                browser.decrementQueueRef();
+            } catch (Exception e) {
+                LOG.warn("decrement ref on browser: " + browser, e);
+            }
+        }
+
+        public QueueBrowserSubscription getBrowser() {
+            return browser;
+        }
     }
    
-    LinkedList<RecoveryDispatch> recoveries = new LinkedList<RecoveryDispatch>();
+    LinkedList<BrowserDispatch> browserDispatches = new LinkedList<BrowserDispatch>();
 
     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
         // synchronize with dispatch method so that no new messages are sent
@@ -257,19 +283,18 @@
                 }
             }
             
-            // do recovery dispatch only if it is a browser subscription
-            if(sub instanceof QueueBrowserSubscription ) { 
-            	// any newly paged in messages that are not dispatched are added to pagedInPending in iterate()
-            	doPageIn(false);
+            if (sub instanceof QueueBrowserSubscription ) { 
+                QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub;
+            	
+                // do again in iterate to ensure new messages are dispatched
+                doPageIn(false);
             
             	synchronized (pagedInMessages) {
-            		RecoveryDispatch rd = new RecoveryDispatch();
-            		rd.messages =  new ArrayList<QueueMessageReference>(pagedInMessages.values());
-            		rd.subscription = sub;
-            		recoveries.addLast(rd);
+            	    if (!pagedInMessages.isEmpty()) {
+            	        BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription, pagedInMessages.values());
+            	        browserDispatches.addLast(browserDispatch);
+            	    }
             	}
-            
-                ((QueueBrowserSubscription)sub).incrementQueueRef();
             }
             if (!(this.optimizedDispatch || isSlave())) {
                 wakeup();
@@ -971,64 +996,45 @@
         return movedCounter;
     }
     
-    RecoveryDispatch getNextRecoveryDispatch() {
+    BrowserDispatch getNextBrowserDispatch() {
         synchronized (pagedInMessages) {
-            if( recoveries.isEmpty() ) {
+            if( browserDispatches.isEmpty() ) {
                 return null;
             }
-            return recoveries.removeFirst();
+            return browserDispatches.removeFirst();
         }
 
     }
-    protected boolean isRecoveryDispatchEmpty() {
-        synchronized (pagedInMessages) {
-            return recoveries.isEmpty();
-        }
-    }
 
     /**
      * @return true if we would like to iterate again
      * @see org.apache.activemq.thread.Task#iterate()
      */
     public boolean iterate() {
+        boolean pageInMoreMessages = false;
         synchronized(iteratingMutex) {
-	        RecoveryDispatch rd;
-	        while ((rd = getNextRecoveryDispatch()) != null) {
+            BrowserDispatch rd;
+	        while ((rd = getNextBrowserDispatch()) != null) {
+	            pageInMoreMessages = true;
+	            
 	            try {
 	                MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
 	                msgContext.setDestination(destination);
 	    
+	                QueueBrowserSubscription browser = rd.getBrowser();
 	                for (QueueMessageReference node : rd.messages) {
-	                    if (!node.isDropped() && !node.isAcked() && (!node.isDropped() || rd.subscription.getConsumerInfo().isBrowser())) {
+	                    if (!node.isAcked()) {
 	                        msgContext.setMessageReference(node);
-	                        if (rd.subscription.matches(node, msgContext)) {
- 	                            // Log showing message dispatching
- 	                            if (LOG.isDebugEnabled()) {
- 	                                LOG.debug(destination.getQualifiedName() + " - Recovery - Message pushed '" + node.hashCode() + " - " + node + "' to subscription: '" + rd.subscription + "'");
- 	                            }
-	                            rd.subscription.add(node);
-	                        } else {
-	                            // make sure it gets queued for dispatched again
-	                            dispatchLock.lock();
-	                            try {
-	                                synchronized(pagedInPendingDispatch) {
-	                                    if (!pagedInPendingDispatch.contains(node)) {
-	                                        pagedInPendingDispatch.add(node);
-	                                    }
-	                                }
-	                            } finally {
-	                                dispatchLock.unlock();
-	                            }
+	                        if (browser.matches(node, msgContext)) {
+	                            browser.add(node);
 	                        }
 	                    }
 	                }
-	                
-	                if( rd.subscription instanceof QueueBrowserSubscription ) {
-	                    ((QueueBrowserSubscription)rd.subscription).decrementQueueRef();
-	                }
-	                
+	                                    
+                    rd.done();
+
 	            } catch (Exception e) {
-	                e.printStackTrace();
+	                LOG.warn("exception on dispatch to browser: " + rd.getBrowser(), e);
 	            }
 	        }
 	        
@@ -1061,7 +1067,6 @@
 	        	}
 	        }
 	        
-	        boolean pageInMoreMessages = false;
 	        synchronized (messages) {
 	            pageInMoreMessages = !messages.isEmpty();
 	        }               

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java?rev=745456&r1=745455&r2=745456&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java Wed Feb 18 09:44:31 2009
@@ -62,12 +62,9 @@
        
     public boolean canSelect(Subscription subscription,
             MessageReference m) throws Exception {
-        if (subscription.isBrowser() && super.canDispatch(subscription, m)) {
-            return true;
-        }
        
-        boolean result =  super.canDispatch(subscription, m) ;
-        if (result) {
+        boolean result =  super.canDispatch(subscription, m);
+        if (result && !subscription.isBrowser()) {
             result = exclusiveConsumer == null
                     || exclusiveConsumer == subscription;
             if (result) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java?rev=745456&r1=745455&r2=745456&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java Wed Feb 18 09:44:31 2009
@@ -72,27 +72,5 @@
             LOG.debug(" changed ownership of " + this + " to "+ tempDest.getConnectionId());
         }
         super.addSubscription(context, sub);
-    } 
-    
-    public void xwakeup() {
-        boolean result = false;
-        synchronized (messages) {
-            result = !messages.isEmpty();
-        }
-        if (result) {
-            try {
-               pageInMessages(false);
-               
-            } catch (Throwable e) {
-                LOG.error("Failed to page in more queue messages ", e);
-            }
-        }
-        if (!messagesWaitingForSpace.isEmpty() || !isRecoveryDispatchEmpty()) {
-            try {
-                taskRunner.wakeup();
-            } catch (InterruptedException e) {
-                LOG.warn("Task Runner failed to wakeup ", e);
-            }
-        }
     }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java?rev=745456&r1=745455&r2=745456&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java Wed Feb 18 09:44:31 2009
@@ -161,6 +161,124 @@
         assertNoMessagesLeft(connection2);
     }
 
+    
+    /*
+     * change the order of the above test
+     */
+    public void testQueueBrowserWith2ConsumersBrowseFirst() throws Exception {
+
+        ActiveMQDestination destination = new ActiveMQQueue("TEST");
+        deliveryMode = DeliveryMode.NON_PERSISTENT;
+        
+        
+        // Setup a second connection with a queue browser.
+        StubConnection connection2 = createConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
+        consumerInfo2.setPrefetchSize(10);
+        consumerInfo2.setBrowser(true);
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+        connection2.request(consumerInfo2);
+
+        // Setup a first connection
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(producerInfo);
+
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        consumerInfo1.setPrefetchSize(10);
+        connection1.request(consumerInfo1);
+
+        // Send the messages
+        connection1.send(createMessage(producerInfo, destination, deliveryMode));
+        connection1.send(createMessage(producerInfo, destination, deliveryMode));
+        connection1.send(createMessage(producerInfo, destination, deliveryMode));
+        //as the messages are sent async - need to synchronize the last
+        //one to ensure they arrive in the order we want
+        connection1.request(createMessage(producerInfo, destination, deliveryMode));
+
+
+        List<Message> messages = new ArrayList<Message>();
+
+        for (int i = 0; i < 4; i++) {
+            Message m1 = receiveMessage(connection1);
+            assertNotNull("m1 is null for index: " + i, m1);
+            messages.add(m1);
+        }
+
+        // no messages present in queue browser as there were no messages when it
+        // was created
+        assertNoMessagesLeft(connection1);
+        assertNoMessagesLeft(connection2);
+    }
+
+    public void testQueueBrowserWith2ConsumersInterleaved() throws Exception {
+
+        ActiveMQDestination destination = new ActiveMQQueue("TEST");
+        deliveryMode = DeliveryMode.NON_PERSISTENT;
+        
+        // Setup a first connection
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(producerInfo);
+
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        consumerInfo1.setPrefetchSize(10);
+        connection1.request(consumerInfo1);
+
+        // Send the messages
+        connection1.request(createMessage(producerInfo, destination, deliveryMode));
+        
+        // Setup a second connection with a queue browser.
+        StubConnection connection2 = createConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
+        consumerInfo2.setPrefetchSize(1);
+        consumerInfo2.setBrowser(true);
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+        connection2.request(consumerInfo2);
+
+        
+        connection1.send(createMessage(producerInfo, destination, deliveryMode));
+        connection1.send(createMessage(producerInfo, destination, deliveryMode));
+        //as the messages are sent async - need to synchronize the last
+        //one to ensure they arrive in the order we want
+        connection1.request(createMessage(producerInfo, destination, deliveryMode));
+
+        
+        List<Message> messages = new ArrayList<Message>();
+
+        for (int i = 0; i < 4; i++) {
+            Message m1 = receiveMessage(connection1);
+            assertNotNull("m1 is null for index: " + i, m1);
+            messages.add(m1);
+        }
+
+        for (int i = 0; i < 1; i++) {
+            Message m1 = messages.get(i);
+            Message m2 = receiveMessage(connection2);
+            assertNotNull("m2 is null for index: " + i, m2);
+            assertEquals(m1.getMessageId(), m2.getMessageId());
+            connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.DELIVERED_ACK_TYPE));
+        }
+
+        assertNoMessagesLeft(connection1);
+        assertNoMessagesLeft(connection2);
+    }
+
+    
     public void initCombosForTestConsumerPrefetchAndStandardAck() {
         addCombinationValues("deliveryMode", new Object[] {
         // Integer.valueOf(DeliveryMode.NON_PERSISTENT),