You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/05/22 23:23:50 UTC
svn commit: r777716 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/region/
test/java/org/apache/activemq/ test/java/org/apache/activemq/usecases/
Author: gtully
Date: Fri May 22 21:23:48 2009
New Revision: 777716
URL: http://svn.apache.org/viewvc?rev=777716&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2262 and https://issues.apache.org/activemq/browse/AMQ-2265
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=777716&r1=777715&r2=777716&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Fri May 22 21:23:48 2009
@@ -897,13 +897,8 @@
if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) {
session.sendAck(pendingAck);
pendingAck=null;
- additionalWindowSize = deliveredCounter;
-
- // When using DUPS ok, we do a real ack.
- if (ackType == MessageAck.STANDARD_ACK_TYPE) {
- deliveredCounter = 0;
- additionalWindowSize = 0;
- }
+ deliveredCounter = 0;
+ additionalWindowSize = 0;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=777716&r1=777715&r2=777716&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Fri May 22 21:23:48 2009
@@ -217,14 +217,12 @@
} else if (ack.isDeliveredAck()) {
// Message was delivered but not acknowledged: update pre-fetch
// counters.
- if (ack.isInTransaction()) {
- if (destination != null) {
- destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
- }
- } else {
- // expired message - expired message in a transacion
- dequeueCounter.addAndGet(ack.getMessageCount());
+ // also. get these for a consumer expired message.
+ if (destination != null && !ack.isInTransaction()) {
+ destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
+ destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
}
+ dequeueCounter.addAndGet(ack.getMessageCount());
dispatchMatched();
return;
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=777716&r1=777715&r2=777716&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Fri May 22 21:23:48 2009
@@ -864,6 +864,7 @@
}
public void testAckOfExpired() throws Exception {
+
ActiveMQConnectionFactory fact = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=4&jms.sendAcksAsync=false");
connection = fact.createActiveMQConnection();
@@ -907,7 +908,9 @@
DestinationViewMBean view = createView(destination);
- assertTrue("Wrong inFlightCount: " + view.getInFlightCount(), (view.getDispatchCount() - view.getDequeueCount()) - view.getInFlightCount() < 5);
+ assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), 0, view.getInFlightCount());
+ assertEquals("Wrong dispatch count: " + view.getDispatchCount(), 8, view.getDispatchCount());
+ assertEquals("Wrong dequeue count: " + view.getDequeueCount(), 8, view.getDequeueCount());
}
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java?rev=777716&r1=777715&r2=777716&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java Fri May 22 21:23:48 2009
@@ -72,7 +72,7 @@
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
connection = factory.createConnection();
- session = connection.createSession(false, session.AUTO_ACKNOWLEDGE);
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(destination);
producer.setTimeToLive(100);
consumer = session.createConsumer(destination);
@@ -117,7 +117,7 @@
DestinationViewMBean view = createView(destination);
- assertTrue("Wrong inFlightCount: " + view.getInFlightCount(), (view.getDispatchCount() - view.getDequeueCount()) - view.getInFlightCount() < 5);
+ assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), view.getDispatchCount() - view.getDequeueCount(), view.getInFlightCount());
}
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {