You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/09/24 16:46:00 UTC
svn commit: r698595 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ActiveMQMessageConsumer.java
main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
test/java/org/apache/activemq/JMSConsumerTest.java
Author: chirino
Date: Wed Sep 24 07:46:00 2008
New Revision: 698595
URL: http://svn.apache.org/viewvc?rev=698595&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-1951
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=698595&r1=698594&r2=698595&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Wed Sep 24 07:46:00 2008
@@ -125,6 +125,8 @@
private MessageTransformer transformer;
private boolean clearDispatchList;
+ private MessageAck pendingAck;
+
/**
* Create a MessageConsumer
*
@@ -615,6 +617,8 @@
ackCounter = 0;
}
}
+ } else {
+ ack = pendingAck;
}
if (ack != null) {
final MessageAck ackToSend = ack;
@@ -835,13 +839,19 @@
// The delivered message list is only needed for the recover method
// which is only used with client ack.
deliveredCounter++;
+
+ MessageAck oldPendingAck = pendingAck;
+ pendingAck = new MessageAck(md, ackType, deliveredCounter);
+ if( oldPendingAck==null ) {
+ pendingAck.setFirstMessageId(pendingAck.getLastMessageId());
+ } else {
+ pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
+ }
+ pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
+
if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) {
- MessageAck ack = new MessageAck(md, ackType, deliveredCounter);
- if( !deliveredMessages.isEmpty() ) {
- ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
- }
- ack.setTransactionId(session.getTransactionContext().getTransactionId());
- session.sendAck(ack);
+ session.sendAck(pendingAck);
+ pendingAck=null;
additionalWindowSize = deliveredCounter;
// When using DUPS ok, we do a real ack.
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=698595&r1=698594&r2=698595&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Sep 24 07:46:00 2008
@@ -216,18 +216,8 @@
throws Exception {
synchronized(dispatchLock) {
dequeueCounter++;
- node
- .getRegionDestination()
- .getDestinationStatistics()
- .getDequeues()
- .increment();
-
- node
- .getRegionDestination()
- .getDestinationStatistics()
- .getInflight()
- .decrement();
-
+ node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
+ node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
prefetchExtension--;
}
}
@@ -236,6 +226,9 @@
// Need to put it back in the front.
synchronized(dispatchLock) {
dispatched.add(0, node);
+ // ActiveMQ workaround for AMQ-1730 - Please Ignore next line
+ node.incrementRedeliveryCounter();
+ node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
}
}
});
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=698595&r1=698594&r2=698595&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Wed Sep 24 07:46:00 2008
@@ -509,22 +509,21 @@
sendMessages(session, destination, 2);
session.commit();
- // Only pick up the first message.
- Message message1 = consumer.receive(1000);
- assertNotNull(message1);
-
- // Don't acknowledge yet. This should keep our prefetch full.
+ // The prefetch should fill up with 1 message.
// Since prefetch is still full, the 2nd message should get dispatched
- // to
- // another consumer.. lets create the 2nd consumer test that it does
+ // to another consumer.. lets create the 2nd consumer test that it does
// make sure it does.
ActiveMQConnection connection2 = (ActiveMQConnection)factory.createConnection();
connections.add(connection2);
Session session2 = connection2.createSession(true, 0);
- session2.createConsumer(destination);
+ MessageConsumer consumer2 = session2.createConsumer(destination);
+
+ // Pick up the first message.
+ Message message1 = consumer.receive(1000);
+ assertNotNull(message1);
- // Only pick up the 2nd messages.
- Message message2 = consumer.receive(1000);
+ // Pick up the 2nd messages.
+ Message message2 = consumer2.receive(1000);
assertNotNull(message2);
session.commit();