You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/01/07 01:57:57 UTC

svn commit: r1556096 - in /qpid/trunk/qpid/java: broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/src/main/java/org/apache/qpid/server/subscription/ systests/src/main/java/org/apache/qpid/server/queue/

Author: robbie
Date: Tue Jan  7 00:57:57 2014
New Revision: 1556096

URL: http://svn.apache.org/r1556096
Log:
QPID-5450: have the group manager try to acquire the message at the time it is accepted into the group, so that the associated shared-group state change occurs within the single synchronization block

Modified:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1556096&r1=1556095&r2=1556096&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Tue Jan  7 00:57:57 2014
@@ -733,7 +733,7 @@ public class SimpleAMQQueue implements A
                     && mightAssign(sub, entry)
                     && !sub.wouldSuspend(entry))
                 {
-                    if (sub.acquires() && !(assign(sub, entry) && entry.acquire(sub)))
+                    if (sub.acquires() && !assign(sub, entry))
                     {
                         // restore credit here that would have been taken away by wouldSuspend since we didn't manage
                         // to acquire the entry for this subscription
@@ -754,10 +754,18 @@ public class SimpleAMQQueue implements A
 
     private boolean assign(final Subscription sub, final QueueEntry entry)
     {
-        return _messageGroupManager == null || _messageGroupManager.acceptMessage(sub, entry);
+        if(_messageGroupManager == null)
+        {
+            //no grouping, try to acquire immediately.
+            return entry.acquire(sub);
+        }
+        else
+        {
+            //the group manager is responsible for acquiring the message if/when appropriate
+            return _messageGroupManager.acceptMessage(sub, entry);
+        }
     }
 
-
     private boolean mightAssign(final Subscription sub, final QueueEntry entry)
     {
         if(_messageGroupManager == null || !sub.acquires())
@@ -1645,7 +1653,7 @@ public class SimpleAMQQueue implements A
                 {
                     if (!sub.wouldSuspend(node))
                     {
-                        if (sub.acquires() && !(assign(sub, node) && node.acquire(sub)))
+                        if (sub.acquires() && !assign(sub, node))
                         {
                             // restore credit here that would have been taken away by wouldSuspend since we didn't manage
                             // to acquire the entry for this subscription

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java?rev=1556096&r1=1556095&r2=1556096&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java Tue Jan  7 00:57:57 2014
@@ -63,6 +63,18 @@ public class AssignedSubscriptionMessage
 
     public boolean acceptMessage(Subscription sub, QueueEntry entry)
     {
+        if(assignMessage(sub, entry))
+        {
+            return entry.acquire(sub);
+        }
+        else
+        {
+            return false;
+        }
+    }
+
+    private boolean assignMessage(Subscription sub, QueueEntry entry)
+    {
         Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
         if(groupVal == null)
         {

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java?rev=1556096&r1=1556095&r2=1556096&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java Tue Jan  7 00:57:57 2014
@@ -136,9 +136,21 @@ public class DefinedGroupMessageGroupMan
 
     public synchronized boolean acceptMessage(final Subscription sub, final QueueEntry entry)
     {
+        if(assignMessage(sub, entry))
+        {
+            return entry.acquire(sub);
+        }
+        else
+        {
+            return false;
+        }
+    }
+
+    private boolean assignMessage(final Subscription sub, final QueueEntry entry)
+    {
         Object groupId = getKey(entry);
         Group group = _groupMap.get(groupId);
-        
+
         if(group == null || !group.isValid())
         {
             group = new Group(groupId, sub);
@@ -152,11 +164,10 @@ public class DefinedGroupMessageGroupMan
             {
                 return false;
             }
-
         }
-        
+
         Subscription assignedSub = group.getSubscription();
-        
+
         if(assignedSub == sub)
         {
             entry.addStateChangeListener(new GroupStateChangeListener(group, entry));
@@ -167,8 +178,7 @@ public class DefinedGroupMessageGroupMan
             return false;            
         }
     }
-    
-    
+
     public synchronized QueueEntry findEarliestAssignedAvailableEntry(final Subscription sub)
     {
         EntryFinder visitor = new EntryFinder(sub);

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java?rev=1556096&r1=1556095&r2=1556096&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java Tue Jan  7 00:57:57 2014
@@ -20,22 +20,28 @@
 */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
-import java.util.HashMap;
-import java.util.Map;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 public class MessageGroupQueueTest extends QpidBrokerTestCase
 {
@@ -469,7 +475,6 @@ public class MessageGroupQueueTest exten
 
     }
 
-
     private Message createMessage(int msg, String group) throws JMSException
     {
         Message send = producerSession.createTextMessage("Message: " + msg);
@@ -478,4 +483,122 @@ public class MessageGroupQueueTest exten
 
         return send;
     }
+
+    /**
+     * Tests that when a number of new messages for a given groupid are arriving while the delivery group
+     * state is also in the process of being emptied (due to acking a message while using prefetch=1), that only
+     * 1 of a number of existing consumers is ever receiving messages for the shared group at a time.
+     */
+    public void testSingleSharedGroupWithMultipleConsumers() throws Exception
+    {
+        final Map<String,Object> arguments = new HashMap<String, Object>();
+        arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY,"group");
+        arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP,"1");
+
+        ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+        queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
+
+        ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+        producer = producerSession.createProducer(queue);
+
+
+        consumerConnection.close();
+        Map<String, String> options = new HashMap<String, String>();
+        options.put(ConnectionURL.OPTIONS_MAXPREFETCH, "1");
+        consumerConnection = getConnectionWithOptions(options);
+
+        int numMessages = 100;
+        SharedGroupTestMessageListener groupingTestMessageListener = new SharedGroupTestMessageListener(numMessages);
+
+        Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session cs3 = ((AMQConnection)consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session cs4 = ((AMQConnection)consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageConsumer consumer1 = cs1.createConsumer(queue);
+        consumer1.setMessageListener(groupingTestMessageListener);
+        MessageConsumer consumer2 = cs2.createConsumer(queue);
+        consumer2.setMessageListener(groupingTestMessageListener);
+        MessageConsumer consumer3 = cs3.createConsumer(queue);
+        consumer3.setMessageListener(groupingTestMessageListener);
+        MessageConsumer consumer4 = cs4.createConsumer(queue);
+        consumer4.setMessageListener(groupingTestMessageListener);
+        consumerConnection.start();
+
+        for(int i = 1; i <= numMessages; i++)
+        {
+            producer.send(createMessage(i, "GROUP"));
+        }
+        producerSession.commit();
+        producer.close();
+        producerSession.close();
+        producerConnection.close();
+
+        assertTrue("Mesages not all recieved in the allowed timeframe", groupingTestMessageListener.waitForLatch(30));
+        assertEquals("Unexpected concurrent processing of messages for the group", 0, groupingTestMessageListener.getConcurrentProcessingCases());
+        assertNull("Unexpecte throwable in message listeners", groupingTestMessageListener.getThrowable());
+    }
+
+    public static class SharedGroupTestMessageListener implements MessageListener
+    {
+        private final CountDownLatch _count;
+        private final AtomicInteger _activeListeners = new AtomicInteger();
+        private final AtomicInteger _concurrentProcessingCases = new AtomicInteger();
+        private Throwable _throwable;
+
+        public SharedGroupTestMessageListener(int numMessages)
+        {
+            _count = new CountDownLatch(numMessages);
+        }
+
+        public void onMessage(Message message)
+        {
+            try
+            {
+                int currentActiveListeners = _activeListeners.incrementAndGet();
+
+                if (currentActiveListeners > 1)
+                {
+                    _concurrentProcessingCases.incrementAndGet();
+
+                    System.err.println("Concurrent processing when handling message: " + message.getIntProperty("msg"));
+                }
+
+                try
+                {
+                    Thread.sleep(25);
+                }
+                catch (InterruptedException e)
+                {
+                    Thread.currentThread().interrupt();
+                }
+
+                _activeListeners.decrementAndGet();
+            }
+            catch (Throwable t)
+            {
+                _throwable = t;
+                t.printStackTrace();
+            }
+            finally
+            {
+                _count.countDown();
+            }
+        }
+
+        public boolean waitForLatch(int seconds) throws Exception
+        {
+            return _count.await(seconds, TimeUnit.SECONDS);
+        }
+
+        public int getConcurrentProcessingCases()
+        {
+            return _concurrentProcessingCases.get();
+        }
+
+        public Throwable getThrowable()
+        {
+            return _throwable;
+        }
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org