You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/12/11 17:36:59 UTC
svn commit: r725737 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ActiveMQMessageConsumer.java
test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java
Author: gtully
Date: Thu Dec 11 08:36:59 2008
New Revision: 725737
URL: http://svn.apache.org/viewvc?rev=725737&view=rev
Log:
fix AMQ-2032 - consumer.dispose was clearing delivered messages even if in a tx
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=725737&r1=725736&r2=725737&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Thu Dec 11 08:36:59 2008
@@ -676,8 +676,10 @@
}
}
}
- synchronized(deliveredMessages) {
- deliveredMessages.clear();
+ if (!session.isTransacted()) {
+ synchronized(deliveredMessages) {
+ deliveredMessages.clear();
+ }
}
List<MessageDispatch> list = unconsumedMessages.removeAll();
if (!this.info.isBrowser()) {
@@ -927,16 +929,19 @@
return;
}
- // Only increase the redlivery delay after the first redelivery..
+ // Only increase the redelivery delay after the first redelivery..
MessageDispatch lastMd = deliveredMessages.getFirst();
- if (lastMd.getMessage().getRedeliveryCounter() > 0) {
+ final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
+ if (currentRedeliveryCount > 0) {
redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
}
MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
- for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
- MessageDispatch md = (MessageDispatch)iter.next();
+ for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
+ MessageDispatch md = iter.next();
md.getMessage().onMessageRolledBack();
+ // ensure we don't filter this as a duplicate
+ session.connection.rollbackDuplicate(this, md.getMessage());
}
if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
@@ -948,26 +953,27 @@
MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
ack.setFirstMessageId(firstMsgId);
session.sendAck(ack,true);
- // ensure we don't filter this as a duplicate
- session.connection.rollbackDuplicate(this, lastMd.getMessage());
// Adjust the window size.
additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
redeliveryDelay = 0;
} else {
- MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
- ack.setFirstMessageId(firstMsgId);
- session.sendAck(ack,true);
+ // only redelivery_ack after first delivery
+ if (currentRedeliveryCount > 0) {
+ MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
+ ack.setFirstMessageId(firstMsgId);
+ session.sendAck(ack,true);
+ }
// stop the delivery of messages.
unconsumedMessages.stop();
- for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
- MessageDispatch md = (MessageDispatch)iter.next();
+ for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
+ MessageDispatch md = iter.next();
unconsumedMessages.enqueueFirst(md);
}
- if (redeliveryDelay > 0) {
+ if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
// Start up the delivery again a little later.
scheduler.executeAfterDelay(new Runnable() {
public void run() {
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java?rev=725737&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java Thu Dec 11 08:36:59 2008
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.test.rollback;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.jms.core.MessageCreator;
+
+public class CloseRollbackRedeliveryQueueTest extends EmbeddedBrokerTestSupport {
+
+ private static final transient Log LOG = LogFactory.getLog(CloseRollbackRedeliveryQueueTest.class);
+
+ protected int numberOfMessagesOnQueue = 1;
+ private Connection connection;
+
+ public void testVerifyCloseRedeliveryWithFailoverTransport() throws Throwable {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ Message message = consumer.receive(1000);
+ String id = message.getJMSMessageID();
+ assertNotNull(message);
+ LOG.info("got message " + message);
+ // close will rollback the current tx
+ session.close();
+
+ session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ consumer = session.createConsumer(destination);
+
+ message = consumer.receive(1000);
+ session.commit();
+ assertNotNull(message);
+ assertEquals("redelivered message", id, message.getJMSMessageID());
+ assertEquals(3, message.getLongProperty("JMSXDeliveryCount"));
+ }
+
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ connection = createConnection();
+ connection.start();
+
+ // lets fill the queue up
+ for (int i = 0; i < numberOfMessagesOnQueue; i++) {
+ template.send(createMessageCreator(i));
+ }
+
+ }
+
+ protected ConnectionFactory createConnectionFactory() throws Exception {
+ // failover: enables message audit - which could get in the way of redelivery
+ return new ActiveMQConnectionFactory("failover:" + bindAddress);
+ }
+
+ protected void tearDown() throws Exception {
+ if (connection != null) {
+ connection.close();
+ }
+ super.tearDown();
+ }
+
+ protected MessageCreator createMessageCreator(final int i) {
+ return new MessageCreator() {
+ public Message createMessage(Session session) throws JMSException {
+ TextMessage answer = session.createTextMessage("Message: " + i);
+ answer.setIntProperty("Counter", i);
+ return answer;
+ }
+ };
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date