You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/09/27 20:17:47 UTC

svn commit: r1391152 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ActiveMQMessageConsumer.java test/java/org/apache/activemq/bugs/AMQ4083Test.java

Author: tabish
Date: Thu Sep 27 18:17:47 2012
New Revision: 1391152

URL: http://svn.apache.org/viewvc?rev=1391152&view=rev
Log:
fix and tests for: https://issues.apache.org/jira/browse/AMQ-4083

Revert previous fix.  We need to ack each expired message when we detect and not overload the usage of delivered acks for extending the prefetch window.  The problem was an edge case where the broker detected a message as expired after the client had delivered it but before it ack'd it as consumed.  

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1391152&r1=1391151&r2=1391152&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Thu Sep 27 18:17:47 2012
@@ -879,11 +879,8 @@ public class ActiveMQMessageConsumer imp
             return;
         }
         if (messageExpired) {
-            synchronized (deliveredMessages) {
-                deliveredMessages.remove(md);
-            }
+            acknowledge(md, MessageAck.DELIVERED_ACK_TYPE);
             stats.getExpiredMessageCount().increment();
-            ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
         } else {
             stats.onMessage();
             if (session.getTransacted()) {
@@ -1060,13 +1057,6 @@ public class ActiveMQMessageConsumer imp
                 ack.setTransactionId(session.getTransactionContext().getTransactionId());
             }
 
-            // if there is a pending delivered ack then we need to send that since there
-            // could be expired Messages in the ack which haven't been acked yet and the
-            // ack for all deliveries might not include those in its range of acks.  The
-            // pending standard acks will be included in the ack for all deliveries.
-            if (pendingAck != null && pendingAck.isDeliveredAck()) {
-                session.sendAck(pendingAck);
-            }
             pendingAck = null;
             session.sendAck(ack);
 
@@ -1137,7 +1127,11 @@ public class ActiveMQMessageConsumer imp
     }
 
     void acknowledge(MessageDispatch md) throws JMSException {
-        MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1);
+        acknowledge(md, MessageAck.INDIVIDUAL_ACK_TYPE);
+    }
+
+    void acknowledge(MessageDispatch md, byte ackType) throws JMSException {
+        MessageAck ack = new MessageAck(md, ackType, 1);
         session.sendAck(ack);
         synchronized(deliveredMessages){
             deliveredMessages.remove(md);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java?rev=1391152&r1=1391151&r2=1391152&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java Thu Sep 27 18:17:47 2012
@@ -19,9 +19,12 @@ package org.apache.activemq.bugs;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
+import java.util.Date;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -33,6 +36,7 @@ import javax.management.ObjectName;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -51,7 +55,10 @@ public class AMQ4083Test {
     private static String TEST_QUEUE = "testQueue";
     private static ActiveMQQueue queue = new ActiveMQQueue(TEST_QUEUE);
 
+    private final int messageCount = 100;
+
     private String connectionUri;
+    private String[] data;
 
     @Before
     public void setUp() throws Exception {
@@ -62,6 +69,12 @@ public class AMQ4083Test {
         connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
         brokerService.start();
         brokerService.waitUntilStarted();
+
+        data = new String[messageCount];
+
+        for (int i = 0; i < messageCount; i++) {
+            data[i] = "Text for message: " + i + " at " + new Date();
+        }
     }
 
     @After
@@ -431,6 +444,68 @@ public class AMQ4083Test {
         LOG.info("InFlight Count: {}", queueView.getInFlightCount());
     }
 
+    @Test
+    public void testConsumeExpiredQueueAndDlq() throws Exception {
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+        Connection connection = factory.createConnection();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producerNormal = session.createProducer(queue);
+        MessageProducer producerExpire = session.createProducer(queue);
+        producerExpire.setTimeToLive(500);
+
+        MessageConsumer dlqConsumer = session.createConsumer(session.createQueue("ActiveMQ.DLQ"));
+        connection.start();
+
+        Connection consumerConnection = factory.createConnection();
+        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+        prefetchPolicy.setAll(10);
+        ((ActiveMQConnection)consumerConnection).setPrefetchPolicy(prefetchPolicy);
+        Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer consumer = consumerSession.createConsumer(queue);
+        consumerConnection.start();
+
+        String msgBody = new String(new byte[20*1024]);
+        for (int i = 0; i < data.length; i++) {
+            Message message = session.createTextMessage(msgBody);
+            producerExpire.send(queue, message);
+        }
+
+        for (int i = 0; i < data.length; i++) {
+            Message message = session.createTextMessage(msgBody);
+            producerNormal.send(queue, message);
+        }
+
+        ArrayList<Message> messages = new ArrayList<Message>();
+        Message received;
+        while ((received = consumer.receive(1000)) != null) {
+            messages.add(received);
+            if (messages.size() == 1) {
+               TimeUnit.SECONDS.sleep(1);
+            }
+            received.acknowledge();
+        };
+
+        assertEquals("got messages", messageCount + 1, messages.size());
+
+        ArrayList<Message> dlqMessages = new ArrayList<Message>();
+        while ((received = dlqConsumer.receive(1000)) != null) {
+            dlqMessages.add(received);
+        };
+
+        assertEquals("got dlq messages", data.length - 1, dlqMessages.size());
+
+        final QueueViewMBean queueView = getProxyToQueueViewMBean();
+
+        LOG.info("Dequeued Count: {}", queueView.getDequeueCount());
+        LOG.info("Dispatch Count: {}", queueView.getDispatchCount());
+        LOG.info("Enqueue Count: {}", queueView.getEnqueueCount());
+        LOG.info("Expired Count: {}", queueView.getExpiredCount());
+        LOG.info("InFlight Count: {}", queueView.getInFlightCount());
+    }
+
     private QueueViewMBean getProxyToQueueViewMBean() throws Exception {
         final ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:Type=Queue,Destination="
                 + queue.getQueueName() + ",BrokerName=localhost");