You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/12/11 17:36:59 UTC

svn commit: r725737 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ActiveMQMessageConsumer.java test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java

Author: gtully
Date: Thu Dec 11 08:36:59 2008
New Revision: 725737

URL: http://svn.apache.org/viewvc?rev=725737&view=rev
Log:
fix AMQ-2032 - consumer.dispose was clearing delivered messages even if in a tx

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=725737&r1=725736&r2=725737&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Thu Dec 11 08:36:59 2008
@@ -676,8 +676,10 @@
                     }
                 }
             }
-            synchronized(deliveredMessages) {
-                deliveredMessages.clear();
+            if (!session.isTransacted()) {
+                synchronized(deliveredMessages) {
+                    deliveredMessages.clear();
+                }
             }
             List<MessageDispatch> list = unconsumedMessages.removeAll();
             if (!this.info.isBrowser()) {
@@ -927,16 +929,19 @@
                     return;
                 }
     
-                // Only increase the redlivery delay after the first redelivery..
+                // Only increase the redelivery delay after the first redelivery..
                 MessageDispatch lastMd = deliveredMessages.getFirst();
-                if (lastMd.getMessage().getRedeliveryCounter() > 0) {
+                final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
+                if (currentRedeliveryCount > 0) {
                     redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
                 }
                 MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
     
-                for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
-                    MessageDispatch md = (MessageDispatch)iter.next();
+                for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
+                    MessageDispatch md = iter.next();
                     md.getMessage().onMessageRolledBack();
+                    // ensure we don't filter this as a duplicate
+                    session.connection.rollbackDuplicate(this, md.getMessage());
                 }
     
                 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
@@ -948,26 +953,27 @@
                     MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
 					ack.setFirstMessageId(firstMsgId);
                     session.sendAck(ack,true);
-                    // ensure we don't filter this as a duplicate
-                    session.connection.rollbackDuplicate(this, lastMd.getMessage());
                     // Adjust the window size.
                     additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
                     redeliveryDelay = 0;
                 } else {
                     
-                    MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
-                    ack.setFirstMessageId(firstMsgId);
-                    session.sendAck(ack,true);
+                    // only redelivery_ack after first delivery
+                    if (currentRedeliveryCount > 0) {
+                        MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
+                        ack.setFirstMessageId(firstMsgId);
+                        session.sendAck(ack,true);
+                    }
     
                     // stop the delivery of messages.
                     unconsumedMessages.stop();
     
-                    for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
-                        MessageDispatch md = (MessageDispatch)iter.next();
+                    for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
+                        MessageDispatch md = iter.next();
                         unconsumedMessages.enqueueFirst(md);
                     }
     
-                    if (redeliveryDelay > 0) {
+                    if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
                         // Start up the delivery again a little later.
                         scheduler.executeAfterDelay(new Runnable() {
                             public void run() {

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java?rev=725737&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java Thu Dec 11 08:36:59 2008
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.test.rollback;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.jms.core.MessageCreator;
+
+public class CloseRollbackRedeliveryQueueTest extends EmbeddedBrokerTestSupport {
+
+    private static final transient Log LOG = LogFactory.getLog(CloseRollbackRedeliveryQueueTest.class);
+
+    protected int numberOfMessagesOnQueue = 1;
+    private Connection connection;
+   
+    public void testVerifyCloseRedeliveryWithFailoverTransport() throws Throwable {
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        Message message = consumer.receive(1000);
+        String id = message.getJMSMessageID();
+        assertNotNull(message);
+        LOG.info("got message " + message);
+        // close will rollback the current tx
+        session.close();
+        
+        session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        consumer = session.createConsumer(destination);
+
+        message = consumer.receive(1000);
+        session.commit();
+        assertNotNull(message);
+        assertEquals("redelivered message", id, message.getJMSMessageID());
+        assertEquals(3, message.getLongProperty("JMSXDeliveryCount"));
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        connection = createConnection();
+        connection.start();
+
+        // lets fill the queue up
+        for (int i = 0; i < numberOfMessagesOnQueue; i++) {
+            template.send(createMessageCreator(i));
+        }
+
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        // failover: enables message audit - which could get in the way of redelivery 
+        return new ActiveMQConnectionFactory("failover:" + bindAddress);
+    }
+    
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        super.tearDown();
+    }
+
+    protected MessageCreator createMessageCreator(final int i) {
+        return new MessageCreator() {
+            public Message createMessage(Session session) throws JMSException {
+                TextMessage answer = session.createTextMessage("Message: " + i);
+                answer.setIntProperty("Counter", i);
+                return answer;
+            }
+        };
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date