You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/05/22 23:23:50 UTC

svn commit: r777716 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/ test/java/org/apache/activemq/usecases/

Author: gtully
Date: Fri May 22 21:23:48 2009
New Revision: 777716

URL: http://svn.apache.org/viewvc?rev=777716&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2262 and https://issues.apache.org/activemq/browse/AMQ-2265

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=777716&r1=777715&r2=777716&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Fri May 22 21:23:48 2009
@@ -897,13 +897,8 @@
         if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) {
             session.sendAck(pendingAck);
             pendingAck=null;
-            additionalWindowSize = deliveredCounter;
-
-            // When using DUPS ok, we do a real ack.
-            if (ackType == MessageAck.STANDARD_ACK_TYPE) {
-                deliveredCounter = 0;
-                additionalWindowSize = 0;
-            }
+            deliveredCounter = 0;
+            additionalWindowSize = 0;
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=777716&r1=777715&r2=777716&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Fri May 22 21:23:48 2009
@@ -217,14 +217,12 @@
         } else if (ack.isDeliveredAck()) {
             // Message was delivered but not acknowledged: update pre-fetch
             // counters.
-            if (ack.isInTransaction()) {
-                if (destination != null) {
-                    destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
-                }
-            } else {
-                // expired message - expired message in a transacion
-                dequeueCounter.addAndGet(ack.getMessageCount());
+            // also. get these for a consumer expired message.
+            if (destination != null && !ack.isInTransaction()) {
+                destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
+                destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());   
             }
+            dequeueCounter.addAndGet(ack.getMessageCount());
             dispatchMatched();
             return;
         }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=777716&r1=777715&r2=777716&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Fri May 22 21:23:48 2009
@@ -864,6 +864,7 @@
     }
         
     public void testAckOfExpired() throws Exception {
+        
         ActiveMQConnectionFactory fact = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=4&jms.sendAcksAsync=false");
         connection = fact.createActiveMQConnection();
         
@@ -907,7 +908,9 @@
     
         DestinationViewMBean view = createView(destination);
         
-        assertTrue("Wrong inFlightCount: " + view.getInFlightCount(), (view.getDispatchCount() - view.getDequeueCount()) - view.getInFlightCount() < 5);
+        assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), 0, view.getInFlightCount());
+        assertEquals("Wrong dispatch count: " + view.getDispatchCount(), 8, view.getDispatchCount());
+        assertEquals("Wrong dequeue count: " + view.getDequeueCount(), 8, view.getDequeueCount());
     }
     
     protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java?rev=777716&r1=777715&r2=777716&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java Fri May 22 21:23:48 2009
@@ -72,7 +72,7 @@
 		
 		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
 		connection = factory.createConnection();
-		session = connection.createSession(false, session.AUTO_ACKNOWLEDGE);
+		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 		producer = session.createProducer(destination);
 		producer.setTimeToLive(100);
 		consumer = session.createConsumer(destination);
@@ -117,7 +117,7 @@
         
         DestinationViewMBean view = createView(destination);
         
-        assertTrue("Wrong inFlightCount: " + view.getInFlightCount(), (view.getDispatchCount() - view.getDequeueCount()) - view.getInFlightCount() < 5);
+        assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), view.getDispatchCount() - view.getDequeueCount(), view.getInFlightCount());
 	}
 	
 	protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {