You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/09/08 16:56:42 UTC
svn commit: r441525 - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/region/
test/java/org/apache/activemq/
Author: chirino
Date: Fri Sep 8 07:56:41 2006
New Revision: 441525
URL: http://svn.apache.org/viewvc?view=rev&rev=441525
Log:
https://issues.apache.org/activemq/browse/AMQ-855
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?view=diff&rev=441525&r1=441524&r2=441525
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Fri Sep 8 07:56:41 2006
@@ -618,7 +618,7 @@
*
*/
protected void sendPullCommand(long timeout) throws JMSException {
- if (info.getPrefetchSize() == 0) {
+ if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
MessagePull messagePull = new MessagePull();
messagePull.configure(info);
messagePull.setTimeout(timeout);
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=441525&r1=441524&r2=441525
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Sep 8 07:56:41 2006
@@ -185,10 +185,15 @@
index++;
acknowledge(context,ack,node);
if(ack.getLastMessageId().equals(messageId)){
- if(context.isInTransaction())
- prefetchExtension=Math.max(prefetchExtension,index+1);
- else
+ if(context.isInTransaction()) {
+ // extend prefetch window only if not a pulling consumer
+ if (getPrefetchSize() != 0) {
+ prefetchExtension=Math.max(prefetchExtension,index+1);
+ }
+ }
+ else {
prefetchExtension=Math.max(0,prefetchExtension-(index+1));
+ }
dispatchMatched();
return;
}else{
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java?view=diff&rev=441525&r1=441524&r2=441525
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java Fri Sep 8 07:56:41 2006
@@ -59,32 +59,78 @@
Message answer = consumer.receive(5000);
assertNotNull("Should have received a message!", answer);
// check if method will return at all and will return a null
- answer = consumer.receive(1000);
+ answer = consumer.receive(1);
assertNull("Should have not received a message!", answer);
answer = consumer.receiveNoWait();
assertNull("Should have not received a message!", answer);
}
public void testIdleConsumer() throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ doTestIdleConsumer(false);
+ }
+
+ public void testIdleConsumerTranscated() throws Exception {
+ doTestIdleConsumer(true);
+ }
+
+ private void doTestIdleConsumer(boolean transacted) throws Exception {
+ Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("Msg1"));
producer.send(session.createTextMessage("Msg2"));
-
+ if(transacted) {
+ session.commit();
+ }
// now lets receive it
MessageConsumer consumer = session.createConsumer(queue);
//noinspection UNUSED_SYMBOL
MessageConsumer idleConsumer = session.createConsumer(queue);
TextMessage answer = (TextMessage) consumer.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg1");
+ if(transacted) {
+ session.commit();
+ }
// this call would return null if prefetchSize > 0
answer = (TextMessage) consumer.receive(5000);
- assertEquals("Should have not received a message!", answer.getText(), "Msg2");
+ assertEquals("Should have received a message!", answer.getText(), "Msg2");
+ if(transacted) {
+ session.commit();
+ }
answer = (TextMessage) consumer.receiveNoWait();
assertNull("Should have not received a message!", answer);
}
+ public void testRecvRecvCommit() throws Exception {
+ doTestRecvRecvCommit(false);
+ }
+
+ public void testRecvRecvCommitTranscated() throws Exception {
+ doTestRecvRecvCommit(true);
+ }
+
+ private void doTestRecvRecvCommit(boolean transacted) throws Exception {
+ Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Msg1"));
+ producer.send(session.createTextMessage("Msg2"));
+ if(transacted) {
+ session.commit();
+ }
+ // now lets receive it
+ MessageConsumer consumer = session.createConsumer(queue);
+ TextMessage answer = (TextMessage) consumer.receiveNoWait();
+ assertEquals("Should have received a message!", answer.getText(), "Msg1");
+ answer = (TextMessage) consumer.receiveNoWait();
+ assertEquals("Should have received a message!", answer.getText(), "Msg2");
+ if(transacted) {
+ session.commit();
+ }
+ answer = (TextMessage) consumer.receiveNoWait();
+ assertNull("Should have not received a message!", answer);
+ }
+
protected void setUp() throws Exception {
bindAddress = "tcp://localhost:61616";
super.setUp();