You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/07/14 00:40:58 UTC

svn commit: r963894 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/broker/region/ activemq-core/src/test/java/org/apache/activemq/ activemq-spring/src/test/java/org/apache/bugs/

Author: gtully
Date: Tue Jul 13 22:40:58 2010
New Revision: 963894

URL: http://svn.apache.org/viewvc?rev=963894&view=rev
Log:
rework fix for https://issues.apache.org/activemq/browse/AMQ-1730 - better support for unordered message consumption when there are multiple short lived consumers, eg with spring mlc and concurrent consumers > 1

Added:
    activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=963894&r1=963893&r2=963894&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Jul 13 22:40:58 2010
@@ -467,15 +467,37 @@ public class Queue extends BaseDestinati
 
                 // redeliver inflight messages
 
-                for (MessageReference ref : sub.remove(context, this)) {
+                boolean markAsRedelivered = false;
+                MessageReference lastDeliveredRef = null;
+                List<MessageReference> unAckedMessages = sub.remove(context, this);
+
+                // locate last redelivered in unconsumed list (list in delivery rather than seq order)
+                if (lastDeiveredSequenceId != 0) {
+                    for (MessageReference ref : unAckedMessages) {
+                        if (ref.getMessageId().getBrokerSequenceId() == lastDeiveredSequenceId) {
+                            lastDeliveredRef = ref;
+                            markAsRedelivered = true;
+                            LOG.debug("found lastDeliveredSeqID: " + lastDeiveredSequenceId + ", message reference: " + ref.getMessageId());
+                            break;
+                        }
+                    }
+                }
+                for (MessageReference ref : unAckedMessages) {
                     QueueMessageReference qmr = (QueueMessageReference) ref;
                     if (qmr.getLockOwner() == sub) {
                         qmr.unlock();
-                        // only increment redelivery if it was delivered or we
+
                         // have no delivery information
-                        if (lastDeiveredSequenceId == 0
-                                || qmr.getMessageId().getBrokerSequenceId() <= lastDeiveredSequenceId) {
+                        if (lastDeiveredSequenceId == 0) {
                             qmr.incrementRedeliveryCounter();
+                        } else {
+                            if (markAsRedelivered) {
+                                qmr.incrementRedeliveryCounter();
+                            }
+                            if (ref == lastDeliveredRef) {
+                                // all that follow were not redelivered
+                                markAsRedelivered = false;
+                            }
                         }
                     }
                     redeliveredWaitingDispatch.add(qmr);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=963894&r1=963893&r2=963894&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Tue Jul 13 22:40:58 2010
@@ -807,7 +807,7 @@ public class JMSConsumerTest extends Jms
                 
         Message msg = redispatchConsumer.receive(1000);
         assertNotNull(msg);
-        assertTrue(msg.getJMSRedelivered());
+        assertTrue("redelivered flag set", msg.getJMSRedelivered());
         assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
         
         msg = redispatchConsumer.receive(1000);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java?rev=963894&r1=963893&r2=963894&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java Tue Jul 13 22:40:58 2010
@@ -98,7 +98,7 @@ public class JmsRollbackRedeliveryTest e
                         session.commit();
                     } else {
                         LOG.info("Rollback message " + msg.getText() + " id: " +  msg.getJMSMessageID());
-                        assertFalse(msg.getJMSRedelivered());
+                        assertFalse("should not have redelivery flag set, id: " + msg.getJMSMessageID(), msg.getJMSRedelivered());
                         session.rollback();
                     }
                 }

Added: activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java?rev=963894&view=auto
==============================================================================
--- activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java (added)
+++ activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java Tue Jul 13 22:40:58 2010
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bugs;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+
+
+public class AMQ1730Test extends TestCase {
+
+    private static final Log log = LogFactory.getLog(AMQ1730Test.class);
+
+
+    private static final String JMSX_DELIVERY_COUNT = "JMSXDeliveryCount";
+
+
+    BrokerService brokerService;
+
+    private static final int MESSAGE_COUNT = 250;
+
+    public AMQ1730Test() {
+        super();
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        brokerService = new BrokerService();
+        brokerService.addConnector("tcp://localhost:0");
+        brokerService.setUseJmx(false);
+        brokerService.start();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        brokerService.stop();
+    }
+
+    public void testRedelivery() throws Exception {
+
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
+                brokerService.getTransportConnectors().get(0).getConnectUri().toString() + "?jms.prefetchPolicy.queuePrefetch=100");
+
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue("queue.test");
+
+        MessageProducer producer = session.createProducer(queue);
+
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            log.info("Sending message " + i);
+            TextMessage message = session.createTextMessage("Message " + i);
+            producer.send(message);
+        }
+
+        producer.close();
+        session.close();
+        connection.stop();
+        connection.close();
+
+        final CountDownLatch countDownLatch = new CountDownLatch(MESSAGE_COUNT);
+
+        final ValueHolder<Boolean> messageRedelivered = new ValueHolder<Boolean>(false);
+
+        DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
+        messageListenerContainer.setConnectionFactory(connectionFactory);
+        messageListenerContainer.setDestination(queue);
+        messageListenerContainer.setAutoStartup(false);
+        messageListenerContainer.setConcurrentConsumers(1);
+        messageListenerContainer.setMaxConcurrentConsumers(16);
+        messageListenerContainer.setMaxMessagesPerTask(10);
+        messageListenerContainer.setReceiveTimeout(10000);
+        messageListenerContainer.setRecoveryInterval(5000);
+        messageListenerContainer.setAcceptMessagesWhileStopping(false);
+        messageListenerContainer.setCacheLevel(DefaultMessageListenerContainer.CACHE_NONE);
+        messageListenerContainer.setSessionTransacted(false);
+        messageListenerContainer.setMessageListener(new MessageListener() {
+
+
+            public void onMessage(Message message) {
+                if (!(message instanceof TextMessage)) {
+                    throw new RuntimeException();
+                }
+                try {
+                    TextMessage textMessage = (TextMessage) message;
+                    String text = textMessage.getText();
+                    int messageDeliveryCount = message.getIntProperty(JMSX_DELIVERY_COUNT);
+                    if (messageDeliveryCount > 1) {
+                        messageRedelivered.set(true);
+                    }
+                    log.info("[Count down latch: " + countDownLatch.getCount() + "][delivery count: " + messageDeliveryCount + "] - " + "Received message with id: " + message.getJMSMessageID() + " with text: " + text);
+
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+                finally {
+                    countDownLatch.countDown();
+                }
+            }
+
+        });
+        messageListenerContainer.afterPropertiesSet();
+
+        messageListenerContainer.start();
+
+        countDownLatch.await();
+        messageListenerContainer.stop();
+        messageListenerContainer.destroy();
+
+        assertFalse("no message has redelivery > 1", messageRedelivered.get());
+    }
+
+    private class ValueHolder<T> {
+
+        private T value;
+
+        public ValueHolder(T value) {
+            super();
+            this.value = value;
+        }
+
+        void set(T value) {
+            this.value = value;
+        }
+
+        T get() {
+            return value;
+        }
+
+    }
+
+}

Propchange: activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date