You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/08/29 10:21:10 UTC

svn commit: r690144 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/Queue.java test/java/org/apache/activemq/bugs/AMQ1917Test.java

Author: gtully
Date: Fri Aug 29 01:21:09 2008
New Revision: 690144

URL: http://svn.apache.org/viewvc?rev=690144&view=rev
Log:
fix AMQ-1917

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=690144&r1=690143&r2=690144&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Aug 29 01:21:09 2008
@@ -210,11 +210,13 @@
     LinkedList<RecoveryDispatch> recoveries = new LinkedList<RecoveryDispatch>();
 
     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
+        // synchronize with dispatch method so that no new messages are sent
+        // while setting up a subscription. avoid out of order messages,
+        // duplicates, etc.
         dispatchLock.lock();
         try {
             sub.add(context, this);
             destinationStatistics.getConsumers().increment();
-//            MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
 
             // needs to be synchronized - so no contention with dispatching
             synchronized (consumers) {
@@ -229,32 +231,28 @@
                     dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
                 }
             }
-            // synchronize with dispatch method so that no new messages are sent
-            // while
-            // setting up a subscription. avoid out of order messages,
-            // duplicates
-            // etc.
+            
+            // any newly paged in messages that are not dispatched are added to pagedInPending in iterate()
             doPageIn(false);
-
+            
             synchronized (pagedInMessages) {
                 RecoveryDispatch rd = new RecoveryDispatch();
                 rd.messages =  new ArrayList<QueueMessageReference>(pagedInMessages.values());
                 rd.subscription = sub;
                 recoveries.addLast(rd);
             }
-            
             if( sub instanceof QueueBrowserSubscription ) {
                 ((QueueBrowserSubscription)sub).incrementQueueRef();
             }
             if (!this.optimizedDispatch) {
-                    wakeup();
+                wakeup();
             }
         }finally {
             dispatchLock.unlock();
         }
         if (this.optimizedDispatch) {
-        // Outside of dispatchLock() to maintain the lock hierarchy of
-        // iteratingMutex -> dispatchLock. - see https://issues.apache.org/activemq/browse/AMQ-1878
+            // Outside of dispatchLock() to maintain the lock hierarchy of
+            // iteratingMutex -> dispatchLock. - see https://issues.apache.org/activemq/browse/AMQ-1878
             wakeup();
         }
     }
@@ -262,11 +260,10 @@
     public void removeSubscription(ConnectionContext context, Subscription sub)
             throws Exception {
         destinationStatistics.getConsumers().decrement();
+        // synchronize with dispatch method so that no new messages are sent
+        // while removing up a subscription.
         dispatchLock.lock();
         try {
-            // synchronize with dispatch method so that no new messages are sent
-            // while
-            // removing up a subscription.
             synchronized (consumers) {
                 removeFromConsumerList(sub);
                 if (sub.getConsumerInfo().isExclusive()) {
@@ -324,7 +321,6 @@
     }
 
     public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
-//        System.out.println(getName()+" send "+message.getMessageId());
         final ConnectionContext context = producerExchange.getConnectionContext();
         // There is delay between the client sending it and it arriving at the
         // destination.. it may have expired.
@@ -934,9 +930,17 @@
 	                for (QueueMessageReference node : rd.messages) {
 	                    if (!node.isDropped() && !node.isAcked() && (!node.isDropped() || rd.subscription.getConsumerInfo().isBrowser())) {
 	                        msgContext.setMessageReference(node);
-	                            if (rd.subscription.matches(node, msgContext)) {
-	                                rd.subscription.add(node);
+	                        if (rd.subscription.matches(node, msgContext)) {
+	                            rd.subscription.add(node);
+	                        } else {
+	                            // make sure it gets queued for dispatched again
+	                            dispatchLock.lock();
+	                            try {
+	                                pagedInPendingDispatch.add(node);
+	                            } finally {
+	                                dispatchLock.unlock();
 	                            }
+	                        }
 	                    }
 	                }
 	                
@@ -949,24 +953,24 @@
 	            }
 	        }
 	
-	        boolean result = false;
+	        boolean pageInMoreMessages = false;
 	        synchronized (messages) {
-	            result = !messages.isEmpty();
+	            pageInMoreMessages = !messages.isEmpty();
 	        }               
 	        
 	        // Kinda ugly.. but I think dispatchLock is the only mutex protecting the 
 	        // pagedInPendingDispatch variable. 	        
 	        dispatchLock.lock();
 	        try {
-	            result |= !pagedInPendingDispatch.isEmpty();
+	            pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
 	        } finally {
 	            dispatchLock.unlock();
 	        }
 	        
 	        // Perhaps we should page always into the pagedInPendingDispatch list is 
-                // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty()
-                // then we do a dispatch.
-	        if (result) {
+	        // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty()
+	        // then we do a dispatch.
+	        if (pageInMoreMessages) {
 	            try {
 	               pageInMessages(false);
 	               
@@ -1116,8 +1120,8 @@
             int toPageIn = (getMaxPageSize()+(int)destinationStatistics.getInflight().getCount()) - pagedInMessages.size();
             toPageIn = Math.min(toPageIn,getMaxPageSize());
             if (isLazyDispatch()&& !force) {
-             // Only page in the minimum number of messages which can be dispatched immediately.
-             toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
+                // Only page in the minimum number of messages which can be dispatched immediately.
+                toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
             }
             if ((force || !consumers.isEmpty()) && toPageIn > 0) {
                 messages.setMaxBatchSize(toPageIn);
@@ -1158,21 +1162,17 @@
         dispatchLock.lock();
         try {
             if(!pagedInPendingDispatch.isEmpty()) {
- //              System.out.println(getName()+": dispatching from pending: "+pagedInPendingDispatch.size());
                 // Try to first dispatch anything that had not been dispatched before.
                 pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
-//                System.out.println(getName()+": new pending list1: "+pagedInPendingDispatch.size());
             }
             // and now see if we can dispatch the new stuff.. and append to the pending 
             // list anything that does not actually get dispatched.
             if (list != null && !list.isEmpty()) {
-//                System.out.println(getName()+": dispatching from paged in: "+list.size());
                 if (pagedInPendingDispatch.isEmpty()) {
                     pagedInPendingDispatch.addAll(doActualDispatch(list));
                 } else {
                     pagedInPendingDispatch.addAll(list);
                 }
-//                System.out.println(getName()+": new pending list2: "+pagedInPendingDispatch.size());
             }
         } finally {
             dispatchLock.unlock();
@@ -1200,7 +1200,6 @@
                         if (!s.isFull()) {
                             // Dispatch it.
                             s.add(node);
-                            //System.err.println(getName()+" Dispatched to "+s.getConsumerInfo().getConsumerId()+", "+node.getMessageId());
                             target = s;
                             break;
                         } else {

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java?rev=690144&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java Fri Aug 29 01:21:09 2008
@@ -0,0 +1,207 @@
+package org.apache.activemq.bugs;
+
+import junit.framework.TestCase;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+
+
+public class AMQ1917Test extends TestCase {
+
+        private static final int NUM_MESSAGES = 4000;
+        private static final int NUM_THREADS = 10;
+        public static final String REQUEST_QUEUE = "mock.in.queue";
+        public static final String REPLY_QUEUE = "mock.out.queue";
+
+        Destination requestDestination = ActiveMQDestination.createDestination(
+                REQUEST_QUEUE, ActiveMQDestination.QUEUE_TYPE);
+        Destination replyDestination = ActiveMQDestination.createDestination(
+                REPLY_QUEUE, ActiveMQDestination.QUEUE_TYPE);
+
+        CountDownLatch roundTripLatch = new CountDownLatch(NUM_MESSAGES);
+        CountDownLatch errorLatch = new CountDownLatch(1);
+        ThreadPoolExecutor tpe;
+        final String BROKER_URL = "tcp://localhost:61616";
+        BrokerService broker = null;
+        private boolean working = true;
+        
+        // trival session/producer pool
+        final Session[] sessions = new Session[NUM_THREADS];
+        final MessageProducer[] producers = new MessageProducer[NUM_THREADS];
+
+        public void setUp() throws Exception {
+            broker = new BrokerService();
+            broker.setPersistent(false);
+            broker.addConnector(BROKER_URL);
+            broker.start();
+            
+            BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10000);
+            tpe = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 60000,
+                    TimeUnit.MILLISECONDS, queue);
+            ThreadFactory limitedthreadFactory = new LimitedThreadFactory(tpe.getThreadFactory());  
+            tpe.setThreadFactory(limitedthreadFactory);
+        }
+
+        public void tearDown() throws Exception {
+            broker.stop();
+            tpe.shutdown();
+        }
+        
+        public void testLoadedSendRecieveWithCorrelationId() throws Exception {            
+           
+            ActiveMQConnectionFactory connectionFactory = new org.apache.activemq.ActiveMQConnectionFactory();
+            connectionFactory.setBrokerURL(BROKER_URL);
+            Connection connection = connectionFactory.createConnection();          
+            setupReceiver(connection);
+
+            connection = connectionFactory.createConnection();
+            connection.start();
+            
+            // trival session/producer pool   
+            for (int i=0; i<NUM_THREADS; i++) {
+                sessions[i] = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                producers[i] = sessions[i].createProducer(requestDestination);
+            }
+            
+            for (int i = 0; i < NUM_MESSAGES; i++) {
+                MessageSenderReceiver msr = new MessageSenderReceiver(requestDestination,
+                        replyDestination, "Test Message : " + i);
+                tpe.execute(msr);
+            }
+            
+            while (!roundTripLatch.await(4000, TimeUnit.MILLISECONDS)) {
+                if (errorLatch.await(1000, TimeUnit.MILLISECONDS)) {
+                    fail("there was an error, check the console for thread or thread allocation failure");
+                    break;
+                }
+            }
+            working = false;
+        }
+
+        private void setupReceiver(final Connection connection) throws Exception {
+
+            final Session session = connection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            final MessageConsumer consumer = session
+                    .createConsumer(requestDestination);
+            final MessageProducer sender = session.createProducer(replyDestination);
+            connection.start();
+
+            new Thread() {
+                public void run() {
+                    while (working) {
+                        // wait for messages in infinitive loop
+                        // time out is set to show the client is awaiting
+                        try {
+                            TextMessage msg = (TextMessage) consumer.receive(20000);
+                            if (msg == null) {
+                                errorLatch.countDown();
+                                fail("Response timed out." 
+                                        + " latchCount=" + roundTripLatch.getCount());
+                            } else {
+                                String result = msg.getText();
+                                //System.out.println("Request:" + (i++)
+                                //        + ", msg=" + result + ", ID" + msg.getJMSMessageID());
+                                TextMessage response = session.createTextMessage();
+                                response.setJMSCorrelationID(msg.getJMSMessageID());
+                                response.setText(result);
+                                sender.send(response);
+                            }
+                        } catch (JMSException e) {
+                            errorLatch.countDown();
+                            fail("Unexpected exception:" + e);
+                        }
+                    }
+                }
+            }.start();
+        }
+
+        class MessageSenderReceiver implements Runnable {
+
+            Destination reqDest;
+            Destination replyDest;
+            String origMsg;
+
+            public MessageSenderReceiver(Destination reqDest,
+                    Destination replyDest, String msg) throws Exception {
+                this.replyDest = replyDest;
+                this.reqDest = reqDest;
+                this.origMsg = msg;
+            }
+
+            private int getIndexFromCurrentThread() {
+                String name = Thread.currentThread().getName();
+                String num = name.substring(name.lastIndexOf('-') +1);
+                int idx = Integer.parseInt(num) -1;
+                assertTrue("idx is in range: idx=" + idx,  idx < NUM_THREADS);
+                return idx;
+            }
+
+            public void run() {
+                try {
+                    // get thread session and producer from pool
+                    int threadIndex = getIndexFromCurrentThread();
+                    Session session = sessions[threadIndex];
+                    MessageProducer producer = producers[threadIndex];
+
+                    final Message sendJmsMsg = session.createTextMessage(origMsg);
+                    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                    producer.send(sendJmsMsg);
+
+                    String jmsId = sendJmsMsg.getJMSMessageID();
+                    String selector = "JMSCorrelationID='" + jmsId + "'";
+
+                    MessageConsumer consumer = session.createConsumer(replyDest,
+                            selector);
+                    Message receiveJmsMsg = consumer.receive(2000);
+                    consumer.close();
+                    if (receiveJmsMsg == null) {
+                        errorLatch.countDown();
+                        fail("Unable to receive response for:" + origMsg
+                                + ", with selector=" + selector);
+                    } else {
+                        //System.out.println("received response message :"
+                        //        + ((TextMessage) receiveJmsMsg).getText()
+                        //        + " with selector : " + selector);
+                        roundTripLatch.countDown();
+                    }
+                } catch (JMSException e) {
+                    fail("unexpected exception:" + e);
+                }
+            }
+        }
+        
+        public class LimitedThreadFactory implements ThreadFactory {
+            int threadCount;
+            private ThreadFactory factory;
+            public LimitedThreadFactory(ThreadFactory threadFactory) {
+                this.factory = threadFactory;
+            }
+
+            public Thread newThread(Runnable arg0) {
+                if (++threadCount > NUM_THREADS) {
+                    errorLatch.countDown();
+                    fail("too many threads requested");
+                }       
+                return factory.newThread(arg0);
+            }
+        }
+    }
+

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date