You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/07/14 00:40:58 UTC
svn commit: r963894 - in /activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/broker/region/
activemq-core/src/test/java/org/apache/activemq/
activemq-spring/src/test/java/org/apache/bugs/
Author: gtully
Date: Tue Jul 13 22:40:58 2010
New Revision: 963894
URL: http://svn.apache.org/viewvc?rev=963894&view=rev
Log:
rework fix for https://issues.apache.org/activemq/browse/AMQ-1730 - better support for unordered message consumption when there are multiple short lived consumers, eg with spring mlc and concurrent consumers > 1
Added:
activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=963894&r1=963893&r2=963894&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Jul 13 22:40:58 2010
@@ -467,15 +467,37 @@ public class Queue extends BaseDestinati
// redeliver inflight messages
- for (MessageReference ref : sub.remove(context, this)) {
+ boolean markAsRedelivered = false;
+ MessageReference lastDeliveredRef = null;
+ List<MessageReference> unAckedMessages = sub.remove(context, this);
+
+ // locate last redelivered in unconsumed list (list in delivery rather than seq order)
+ if (lastDeiveredSequenceId != 0) {
+ for (MessageReference ref : unAckedMessages) {
+ if (ref.getMessageId().getBrokerSequenceId() == lastDeiveredSequenceId) {
+ lastDeliveredRef = ref;
+ markAsRedelivered = true;
+ LOG.debug("found lastDeliveredSeqID: " + lastDeiveredSequenceId + ", message reference: " + ref.getMessageId());
+ break;
+ }
+ }
+ }
+ for (MessageReference ref : unAckedMessages) {
QueueMessageReference qmr = (QueueMessageReference) ref;
if (qmr.getLockOwner() == sub) {
qmr.unlock();
- // only increment redelivery if it was delivered or we
+
// have no delivery information
- if (lastDeiveredSequenceId == 0
- || qmr.getMessageId().getBrokerSequenceId() <= lastDeiveredSequenceId) {
+ if (lastDeiveredSequenceId == 0) {
qmr.incrementRedeliveryCounter();
+ } else {
+ if (markAsRedelivered) {
+ qmr.incrementRedeliveryCounter();
+ }
+ if (ref == lastDeliveredRef) {
+ // all that follow were not redelivered
+ markAsRedelivered = false;
+ }
}
}
redeliveredWaitingDispatch.add(qmr);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=963894&r1=963893&r2=963894&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Tue Jul 13 22:40:58 2010
@@ -807,7 +807,7 @@ public class JMSConsumerTest extends Jms
Message msg = redispatchConsumer.receive(1000);
assertNotNull(msg);
- assertTrue(msg.getJMSRedelivered());
+ assertTrue("redelivered flag set", msg.getJMSRedelivered());
assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
msg = redispatchConsumer.receive(1000);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java?rev=963894&r1=963893&r2=963894&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java Tue Jul 13 22:40:58 2010
@@ -98,7 +98,7 @@ public class JmsRollbackRedeliveryTest e
session.commit();
} else {
LOG.info("Rollback message " + msg.getText() + " id: " + msg.getJMSMessageID());
- assertFalse(msg.getJMSRedelivered());
+ assertFalse("should not have redelivery flag set, id: " + msg.getJMSMessageID(), msg.getJMSRedelivered());
session.rollback();
}
}
Added: activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java?rev=963894&view=auto
==============================================================================
--- activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java (added)
+++ activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java Tue Jul 13 22:40:58 2010
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bugs;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+
+
+public class AMQ1730Test extends TestCase {
+
+ private static final Log log = LogFactory.getLog(AMQ1730Test.class);
+
+
+ private static final String JMSX_DELIVERY_COUNT = "JMSXDeliveryCount";
+
+
+ BrokerService brokerService;
+
+ private static final int MESSAGE_COUNT = 250;
+
+ public AMQ1730Test() {
+ super();
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ brokerService = new BrokerService();
+ brokerService.addConnector("tcp://localhost:0");
+ brokerService.setUseJmx(false);
+ brokerService.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ brokerService.stop();
+ }
+
+ public void testRedelivery() throws Exception {
+
+ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
+ brokerService.getTransportConnectors().get(0).getConnectUri().toString() + "?jms.prefetchPolicy.queuePrefetch=100");
+
+ Connection connection = connectionFactory.createConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue("queue.test");
+
+ MessageProducer producer = session.createProducer(queue);
+
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ log.info("Sending message " + i);
+ TextMessage message = session.createTextMessage("Message " + i);
+ producer.send(message);
+ }
+
+ producer.close();
+ session.close();
+ connection.stop();
+ connection.close();
+
+ final CountDownLatch countDownLatch = new CountDownLatch(MESSAGE_COUNT);
+
+ final ValueHolder<Boolean> messageRedelivered = new ValueHolder<Boolean>(false);
+
+ DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
+ messageListenerContainer.setConnectionFactory(connectionFactory);
+ messageListenerContainer.setDestination(queue);
+ messageListenerContainer.setAutoStartup(false);
+ messageListenerContainer.setConcurrentConsumers(1);
+ messageListenerContainer.setMaxConcurrentConsumers(16);
+ messageListenerContainer.setMaxMessagesPerTask(10);
+ messageListenerContainer.setReceiveTimeout(10000);
+ messageListenerContainer.setRecoveryInterval(5000);
+ messageListenerContainer.setAcceptMessagesWhileStopping(false);
+ messageListenerContainer.setCacheLevel(DefaultMessageListenerContainer.CACHE_NONE);
+ messageListenerContainer.setSessionTransacted(false);
+ messageListenerContainer.setMessageListener(new MessageListener() {
+
+
+ public void onMessage(Message message) {
+ if (!(message instanceof TextMessage)) {
+ throw new RuntimeException();
+ }
+ try {
+ TextMessage textMessage = (TextMessage) message;
+ String text = textMessage.getText();
+ int messageDeliveryCount = message.getIntProperty(JMSX_DELIVERY_COUNT);
+ if (messageDeliveryCount > 1) {
+ messageRedelivered.set(true);
+ }
+ log.info("[Count down latch: " + countDownLatch.getCount() + "][delivery count: " + messageDeliveryCount + "] - " + "Received message with id: " + message.getJMSMessageID() + " with text: " + text);
+
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ finally {
+ countDownLatch.countDown();
+ }
+ }
+
+ });
+ messageListenerContainer.afterPropertiesSet();
+
+ messageListenerContainer.start();
+
+ countDownLatch.await();
+ messageListenerContainer.stop();
+ messageListenerContainer.destroy();
+
+ assertFalse("no message has redelivery > 1", messageRedelivered.get());
+ }
+
+ private class ValueHolder<T> {
+
+ private T value;
+
+ public ValueHolder(T value) {
+ super();
+ this.value = value;
+ }
+
+ void set(T value) {
+ this.value = value;
+ }
+
+ T get() {
+ return value;
+ }
+
+ }
+
+}
Propchange: activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java
------------------------------------------------------------------------------
svn:keywords = Rev Date