You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/09/30 17:07:55 UTC

svn commit: r1003096 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/Queue.java main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java test/java/org/apache/activemq/usecases/MessageGroupCloseTest.java

Author: dejanb
Date: Thu Sep 30 15:07:55 2010
New Revision: 1003096

URL: http://svn.apache.org/viewvc?rev=1003096&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2952 - message groups with small prefetch

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupCloseTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1003096&r1=1003095&r2=1003096&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Sep 30 15:07:55 2010
@@ -55,16 +55,7 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
 import org.apache.activemq.broker.region.policy.DispatchPolicy;
 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ExceptionResponse;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerAck;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.Response;
+import org.apache.activemq.command.*;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
@@ -1796,10 +1787,12 @@ public class Queue extends BaseDestinati
                 if (dispatchSelector.canSelect(s, node)) {
                     if (!fullConsumers.contains(s)) {
                         if (!s.isFull()) {
-                            // Dispatch it.
-                            s.add(node);
-                            target = s;                            
-                            break;
+                            if (assignMessageGroup(s, (QueueMessageReference)node)) {
+                                // Dispatch it.
+                                s.add(node);
+                                target = s;
+                                break;
+                            }
                         } else {
                             // no further dispatch of list to a full consumer to
                             // avoid out of order message receipt
@@ -1841,6 +1834,60 @@ public class Queue extends BaseDestinati
         return rc;
     }
 
+    protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception {
+        //QueueMessageReference node = (QueueMessageReference) m;
+        boolean result = true;
+        // Keep message groups together.
+        String groupId = node.getGroupID();
+        int sequence = node.getGroupSequence();
+        if (groupId != null) {
+            //MessageGroupMap messageGroupOwners = ((Queue) node
+            //        .getRegionDestination()).getMessageGroupOwners();
+
+            MessageGroupMap messageGroupOwners = getMessageGroupOwners();
+            // If we can own the first, then no-one else should own the
+            // rest.
+            if (sequence == 1) {
+                assignGroup(subscription, messageGroupOwners, node, groupId);
+            } else {
+
+                // Make sure that the previous owner is still valid, we may
+                // need to become the new owner.
+                ConsumerId groupOwner;
+
+                groupOwner = messageGroupOwners.get(groupId);
+                if (groupOwner == null) {
+                    assignGroup(subscription, messageGroupOwners, node, groupId);
+                } else {
+                    if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {
+                        // A group sequence < 1 is an end of group signal.
+                        if (sequence < 0) {
+                            messageGroupOwners.removeGroup(groupId);
+                        }
+                    } else {
+                        result = false;
+                    }
+                }
+            }
+        }
+
+        return result;
+
+    }
+
+    protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException {
+        messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());
+        Message message = n.getMessage();
+        if (message instanceof ActiveMQMessage) {
+            ActiveMQMessage activeMessage = (ActiveMQMessage) message;
+            try {
+                activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false);
+            } catch (JMSException e) {
+                LOG.warn("Failed to set boolean header: " + e, e);
+            }
+        }
+    }
+
     protected void pageInMessages(boolean force) throws Exception {
         doDispatch(doPageIn(force));
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java?rev=1003096&r1=1003095&r2=1003096&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java Thu Sep 30 15:07:55 2010
@@ -65,61 +65,9 @@ public class QueueDispatchSelector exten
        
         boolean result =  super.canDispatch(subscription, m);
         if (result && !subscription.isBrowser()) {
-            result = exclusiveConsumer == null
-                    || exclusiveConsumer == subscription;
-            if (result) {
-                QueueMessageReference node = (QueueMessageReference) m;
-                // Keep message groups together.
-                String groupId = node.getGroupID();
-                int sequence = node.getGroupSequence();
-                if (groupId != null) {
-                    MessageGroupMap messageGroupOwners = ((Queue) node
-                            .getRegionDestination()).getMessageGroupOwners();
-
-                    // If we can own the first, then no-one else should own the
-                    // rest.
-                    if (sequence == 1) {
-                        assignGroup(subscription, messageGroupOwners, node,groupId);
-                    }else {
-    
-                        // Make sure that the previous owner is still valid, we may
-                        // need to become the new owner.
-                        ConsumerId groupOwner;
-    
-                        groupOwner = messageGroupOwners.get(groupId);
-                        if (groupOwner == null) {
-                            assignGroup(subscription, messageGroupOwners, node,groupId);
-                        } else {
-                            if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {
-                                // A group sequence < 1 is an end of group signal.
-                                if (sequence < 0) {
-                                    messageGroupOwners.removeGroup(groupId);
-                                }
-                            } else {
-                                result = false;
-                            }
-                        }
-                    }
-                }
-            }
+            result = exclusiveConsumer == null || exclusiveConsumer == subscription;
         }
         return result;
     }
     
-    protected void assignGroup(Subscription subs,MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException {
-        messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());
-        Message message = n.getMessage();
-        if (message instanceof ActiveMQMessage) {
-            ActiveMQMessage activeMessage = (ActiveMQMessage)message;
-            try {
-                activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false);
-            } catch (JMSException e) {
-                LOG.warn("Failed to set boolean header: " + e, e);
-            }
-        }
-    }
-    
-    
-    
-    
 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupCloseTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupCloseTest.java?rev=1003096&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupCloseTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupCloseTest.java Thu Sep 30 15:07:55 2010
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.*;
+import javax.jms.Queue;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+
+/*
+ * Test plan:
+ * Producer: publish messages into a queue, with 10 message groups, closing the group with seq=-1 on message 5 and message 10
+ * Consumers: 2 consumers created after all messages are sent
+ *
+ * Expected: for each group, messages 1-5 are handled by one consumer and messages 6-10 are handled by the other consumer.  Messages
+ * 1 and 6 have the JMSXGroupFirstForConsumer property set to true.
+ */
+public class MessageGroupCloseTest extends TestCase {
+    private static final Log LOG = LogFactory.getLog(MessageGroupNewConsumerTest.class);
+    private Connection connection;
+    // Released after all messages are created
+    private CountDownLatch latchMessagesCreated = new CountDownLatch(1);
+
+    private int messagesSent, messagesRecvd1, messagesRecvd2, messageGroupCount, errorCountFirstForConsumer, errorCountWrongConsumerClose, errorCountDuplicateClose;
+    // groupID, count
+    private HashMap<String, Integer> messageGroups1 = new HashMap<String, Integer>();
+    private HashMap<String, Integer> messageGroups2 = new HashMap<String, Integer>();
+    private HashSet<String> closedGroups1 = new HashSet<String>();
+    private HashSet<String> closedGroups2 = new HashSet<String>();
+    // with the prefetch too high, this bug is not realized
+    private static final String connStr =
+        //"tcp://localhost:61616";
+        "vm://localhost?broker.persistent=false&broker.useJmx=false&jms.prefetchPolicy.all=1";
+
+    public void testNewConsumer() throws JMSException, InterruptedException {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connStr);
+        connection = factory.createConnection();
+        connection.start();
+        final String queueName = this.getClass().getSimpleName();
+        final Thread producerThread = new Thread() {
+            public void run() {
+                try {
+                    Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+                    Queue queue = session.createQueue(queueName);
+                    MessageProducer prod = session.createProducer(queue);
+                    for (int i=0; i<10; i++) {
+                        for (int j=0; j<10; j++) {
+                            int seq = j + 1;
+                            if ((j+1) % 5 == 0) {
+                                seq = -1;
+                            }
+                            Message message = generateMessage(session, Integer.toString(i), seq);
+                            prod.send(message);
+                            session.commit();
+                            messagesSent++;
+                            LOG.info("Sent message: group=" + i + ", seq="+ seq);
+                            //Thread.sleep(20);
+                        }
+                        if (i % 100 == 0) {
+                            LOG.info("Sent messages: group=" + i);
+                        }
+                        messageGroupCount++;
+                    }
+                    LOG.info(messagesSent+" messages sent");
+                    latchMessagesCreated.countDown();
+                    prod.close();
+                    session.close();
+                } catch (Exception e) {
+                    LOG.error("Producer failed", e);
+                }
+            }
+        };
+        final Thread consumerThread1 = new Thread() {
+            public void run() {
+                try {
+                    latchMessagesCreated.await();
+                    LOG.info("starting consumer1");
+                    Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+                    Queue queue = session.createQueue(queueName);
+                    MessageConsumer con1 = session.createConsumer(queue);
+                    while(true) {
+                        Message message = con1.receive(5000);
+                        if (message == null) break;
+                        LOG.info("Con1: got message "+formatMessage(message));
+                        checkMessage(message, "Con1", messageGroups1, closedGroups1);
+                        session.commit();
+                        messagesRecvd1++;
+                        if (messagesRecvd1 % 100 == 0) {
+                            LOG.info("Con1: got messages count=" + messagesRecvd1);
+                        }
+                        //Thread.sleep(50);
+                    }
+                    LOG.info("Con1: total messages=" + messagesRecvd1);
+                    LOG.info("Con1: total message groups=" + messageGroups1.size());
+                    con1.close();
+                    session.close();
+                } catch (Exception e) {
+                    LOG.error("Consumer 1 failed", e);
+                }
+            }
+        };
+        final Thread consumerThread2 = new Thread() {
+            public void run() {
+                try {
+                    latchMessagesCreated.await();
+                    LOG.info("starting consumer2");
+                    Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+                    Queue queue = session.createQueue(queueName);
+                    MessageConsumer con2 = session.createConsumer(queue);
+                    while(true) {
+                        Message message = con2.receive(5000);
+                        if (message == null) { break; }
+                        LOG.info("Con2: got message "+formatMessage(message));
+                        checkMessage(message, "Con2", messageGroups2, closedGroups2);
+                        session.commit();
+                        messagesRecvd2++;
+                        if (messagesRecvd2 % 100 == 0) {
+                            LOG.info("Con2: got messages count=" + messagesRecvd2);
+                        }
+                        //Thread.sleep(50);
+                    }
+                    con2.close();
+                    session.close();
+                    LOG.info("Con2: total messages=" + messagesRecvd2);
+                    LOG.info("Con2: total message groups=" + messageGroups2.size());
+                } catch (Exception e) {
+                    LOG.error("Consumer 2 failed", e);
+                }
+            }
+        };
+        consumerThread2.start();
+        consumerThread1.start();
+        producerThread.start();
+        // wait for threads to finish
+        producerThread.join();
+        consumerThread1.join();
+        consumerThread2.join();
+        connection.close();
+        // check results
+
+        assertEquals("consumers should get all the messages", messagesSent, messagesRecvd1 + messagesRecvd2);
+        assertEquals("not all message groups closed for consumer 1", messageGroups1.size(), closedGroups1.size());
+        assertEquals("not all message groups closed for consumer 2", messageGroups2.size(), closedGroups2.size());
+        assertTrue("producer failed to send any messages", messagesSent > 0);
+        assertEquals("JMSXGroupFirstForConsumer not set", 0, errorCountFirstForConsumer);
+        assertEquals("wrong consumer got close message", 0, errorCountWrongConsumerClose);
+        assertEquals("consumer got duplicate close message", 0, errorCountDuplicateClose);
+    }
+
+    public Message generateMessage(Session session, String groupId, int seq) throws JMSException {
+        TextMessage m = session.createTextMessage();
+        m.setJMSType("TEST_MESSAGE");
+        m.setStringProperty("JMSXGroupID", groupId);
+        m.setIntProperty("JMSXGroupSeq", seq);
+        m.setText("<?xml?><testMessage/>");
+        return m;
+    }
+    public String formatMessage(Message m) {
+        try {
+            return "group="+m.getStringProperty("JMSXGroupID")+", seq="+m.getIntProperty("JMSXGroupSeq");
+        } catch (Exception e) {
+            return e.getClass().getSimpleName()+": "+e.getMessage();
+        }
+    }
+    public void checkMessage(Message m, String consumerId, Map<String, Integer> messageGroups, Set<String> closedGroups) throws JMSException {
+        String groupId = m.getStringProperty("JMSXGroupID");
+        int seq = m.getIntProperty("JMSXGroupSeq");
+        Integer count = messageGroups.get(groupId);
+        if (count == null) {
+            // first time seeing this group
+            if (!m.propertyExists("JMSXGroupFirstForConsumer") ||
+                !m.getBooleanProperty("JMSXGroupFirstForConsumer")) {
+                LOG.info(consumerId + ": JMSXGroupFirstForConsumer not set for group=" + groupId + ", seq=" +seq);
+                errorCountFirstForConsumer++;
+            }
+            if (seq == -1) {
+                closedGroups.add(groupId);
+                LOG.info(consumerId + ": wrong consumer got close message for group=" + groupId);
+                errorCountWrongConsumerClose++;
+            }
+            messageGroups.put(groupId, 1);
+        } else {
+            // existing group
+            if (closedGroups.contains(groupId)) {
+                // group reassigned to same consumer
+                closedGroups.remove(groupId);
+                if (!m.propertyExists("JMSXGroupFirstForConsumer") ||
+                    !m.getBooleanProperty("JMSXGroupFirstForConsumer")) {
+                    LOG.info(consumerId + ": JMSXGroupFirstForConsumer not set for group=" + groupId + ", seq=" +seq);
+                    errorCountFirstForConsumer++;
+                }
+                if (seq == -1) {
+                    LOG.info(consumerId + ": consumer got duplicate close message for group=" + groupId);
+                    errorCountDuplicateClose++;
+                }
+            }
+            if (seq == -1) {
+                closedGroups.add(groupId);
+            }
+            messageGroups.put(groupId, count + 1);
+        }
+    }
+}
\ No newline at end of file