You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/09/24 16:46:00 UTC

svn commit: r698595 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ActiveMQMessageConsumer.java main/java/org/apache/activemq/broker/region/PrefetchSubscription.java test/java/org/apache/activemq/JMSConsumerTest.java

Author: chirino
Date: Wed Sep 24 07:46:00 2008
New Revision: 698595

URL: http://svn.apache.org/viewvc?rev=698595&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-1951

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=698595&r1=698594&r2=698595&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Wed Sep 24 07:46:00 2008
@@ -125,6 +125,8 @@
     private MessageTransformer transformer;
     private boolean clearDispatchList;
 
+    private MessageAck pendingAck;
+
     /**
      * Create a MessageConsumer
      * 
@@ -615,6 +617,8 @@
             			ackCounter = 0;
             		}
             	}
+            } else {
+                ack = pendingAck;
             }
             if (ack != null) {
                 final MessageAck ackToSend = ack;
@@ -835,13 +839,19 @@
         // The delivered message list is only needed for the recover method
         // which is only used with client ack.
         deliveredCounter++;
+        
+        MessageAck oldPendingAck = pendingAck;
+        pendingAck = new MessageAck(md, ackType, deliveredCounter);
+        if( oldPendingAck==null ) {
+            pendingAck.setFirstMessageId(pendingAck.getLastMessageId());
+        } else {
+            pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
+        }
+        pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
+
         if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) {
-            MessageAck ack = new MessageAck(md, ackType, deliveredCounter);
-            if( !deliveredMessages.isEmpty() ) {
-            	ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
-            }
-            ack.setTransactionId(session.getTransactionContext().getTransactionId());
-            session.sendAck(ack);
+            session.sendAck(pendingAck);
+            pendingAck=null;
             additionalWindowSize = deliveredCounter;
 
             // When using DUPS ok, we do a real ack.

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=698595&r1=698594&r2=698595&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Sep 24 07:46:00 2008
@@ -216,18 +216,8 @@
                                                 throws Exception {
                                             synchronized(dispatchLock) {
                                                 dequeueCounter++;
-                                                node
-                                                        .getRegionDestination()
-                                                        .getDestinationStatistics()
-                                                        .getDequeues()
-                                                        .increment();
-
-                                                node
-                                                        .getRegionDestination()
-                                                        .getDestinationStatistics()
-                                                        .getInflight()
-                                                        .decrement();
-
+                                                node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
+                                                node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
                                                 prefetchExtension--;
                                             }
                                         }
@@ -236,6 +226,9 @@
                                         	// Need to put it back in the front.
                                             synchronized(dispatchLock) {
                                         	    dispatched.add(0, node);
+                                            	// ActiveMQ workaround for AMQ-1730 - Please Ignore next line
+                                                node.incrementRedeliveryCounter();
+                                                node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
                                             }
                                         }
                                     });

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=698595&r1=698594&r2=698595&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Wed Sep 24 07:46:00 2008
@@ -509,22 +509,21 @@
         sendMessages(session, destination, 2);
         session.commit();
 
-        // Only pick up the first message.
-        Message message1 = consumer.receive(1000);
-        assertNotNull(message1);
-
-        // Don't acknowledge yet. This should keep our prefetch full.
+        // The prefetch should fill up with 1 message.
         // Since prefetch is still full, the 2nd message should get dispatched
-        // to
-        // another consumer.. lets create the 2nd consumer test that it does
+        // to another consumer.. lets create the 2nd consumer test that it does
         // make sure it does.
         ActiveMQConnection connection2 = (ActiveMQConnection)factory.createConnection();
         connections.add(connection2);
         Session session2 = connection2.createSession(true, 0);
-        session2.createConsumer(destination);
+        MessageConsumer consumer2 = session2.createConsumer(destination);
+
+        // Pick up the first message.
+        Message message1 = consumer.receive(1000);
+        assertNotNull(message1);
 
-        // Only pick up the 2nd messages.
-        Message message2 = consumer.receive(1000);
+        // Pick up the 2nd messages.
+        Message message2 = consumer2.receive(1000);
         assertNotNull(message2);
 
         session.commit();