You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/09/08 16:56:42 UTC

svn commit: r441525 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/

Author: chirino
Date: Fri Sep  8 07:56:41 2006
New Revision: 441525

URL: http://svn.apache.org/viewvc?view=rev&rev=441525
Log:
https://issues.apache.org/activemq/browse/AMQ-855

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?view=diff&rev=441525&r1=441524&r2=441525
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Fri Sep  8 07:56:41 2006
@@ -618,7 +618,7 @@
      *
      */
     protected void sendPullCommand(long timeout) throws JMSException {
-        if (info.getPrefetchSize() == 0) {
+        if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
             MessagePull messagePull = new MessagePull();
             messagePull.configure(info);
             messagePull.setTimeout(timeout);            

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=441525&r1=441524&r2=441525
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Sep  8 07:56:41 2006
@@ -185,10 +185,15 @@
                     index++;
                     acknowledge(context,ack,node);
                     if(ack.getLastMessageId().equals(messageId)){
-                        if(context.isInTransaction())
-                            prefetchExtension=Math.max(prefetchExtension,index+1);
-                        else
+                        if(context.isInTransaction()) {
+                            // extend prefetch window only if not a pulling consumer
+                            if (getPrefetchSize() != 0) {
+                                prefetchExtension=Math.max(prefetchExtension,index+1);
+                            }
+                        }
+                        else {
                             prefetchExtension=Math.max(0,prefetchExtension-(index+1));
+                        }
                         dispatchMatched();
                         return;
                     }else{

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java?view=diff&rev=441525&r1=441524&r2=441525
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java Fri Sep  8 07:56:41 2006
@@ -59,32 +59,78 @@
         Message answer = consumer.receive(5000);
         assertNotNull("Should have received a message!", answer);
         // check if method will return at all and will return a null
-        answer = consumer.receive(1000);
+        answer = consumer.receive(1);
         assertNull("Should have not received a message!", answer);
         answer = consumer.receiveNoWait();
         assertNull("Should have not received a message!", answer);
     }
 
     public void testIdleConsumer() throws Exception {
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        doTestIdleConsumer(false);
+    }
+    
+    public void testIdleConsumerTranscated() throws Exception {
+        doTestIdleConsumer(true);
+    }
+    
+    private void doTestIdleConsumer(boolean transacted) throws Exception {
+        Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
 
         MessageProducer producer = session.createProducer(queue);
         producer.send(session.createTextMessage("Msg1"));
         producer.send(session.createTextMessage("Msg2"));
-
+        if(transacted) {
+            session.commit();            
+        }
         // now lets receive it
         MessageConsumer consumer = session.createConsumer(queue);
         //noinspection UNUSED_SYMBOL
         MessageConsumer idleConsumer = session.createConsumer(queue);
         TextMessage answer = (TextMessage) consumer.receive(5000);
         assertEquals("Should have received a message!", answer.getText(), "Msg1");
+        if(transacted) {
+            session.commit();            
+        }
         // this call would return null if prefetchSize > 0
         answer = (TextMessage) consumer.receive(5000);
-        assertEquals("Should have not received a message!", answer.getText(), "Msg2");
+        assertEquals("Should have received a message!", answer.getText(), "Msg2");
+        if(transacted) {
+            session.commit();            
+        }
         answer = (TextMessage) consumer.receiveNoWait();
         assertNull("Should have not received a message!", answer);
     }
 
+    public void testRecvRecvCommit() throws Exception {
+        doTestRecvRecvCommit(false);
+    }
+    
+    public void testRecvRecvCommitTranscated() throws Exception {
+        doTestRecvRecvCommit(true);
+    }
+    
+    private void doTestRecvRecvCommit(boolean transacted) throws Exception {
+        Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Msg1"));
+        producer.send(session.createTextMessage("Msg2"));
+        if(transacted) {
+            session.commit();            
+        }
+        // now lets receive it
+        MessageConsumer consumer = session.createConsumer(queue);
+        TextMessage answer = (TextMessage) consumer.receiveNoWait();
+        assertEquals("Should have received a message!", answer.getText(), "Msg1");
+        answer = (TextMessage) consumer.receiveNoWait();
+        assertEquals("Should have received a message!", answer.getText(), "Msg2");
+        if(transacted) {
+            session.commit();            
+        }
+        answer = (TextMessage) consumer.receiveNoWait();
+        assertNull("Should have not received a message!", answer);
+    }
+    
     protected void setUp() throws Exception {
         bindAddress = "tcp://localhost:61616";
         super.setUp();