You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/01/20 17:44:11 UTC

svn commit: r901271 - in /activemq/branches/activemq-5.3/activemq-core/src: main/java/org/apache/activemq/broker/region/PrefetchSubscription.java test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java

Author: gtully
Date: Wed Jan 20 16:44:11 2010
New Revision: 901271

URL: http://svn.apache.org/viewvc?rev=901271&view=rev
Log:
svn merge -c 901269 https://svn.apache.org/repos/asf/activemq/trunk - resolve https://issues.apache.org/activemq/browse/AMQ-2567

Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=901271&r1=901270&r2=901271&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Jan 20 16:44:11 2010
@@ -252,20 +252,13 @@
                         }
                         index++;
                         acknowledge(context, ack, node);
-                        if (ack.getLastMessageId().equals(messageId)) {
-                            
-                            if (context.isInTransaction()) {
-                                // extend prefetch window only if not a pulling
-                                // consumer
-                                if (getPrefetchSize() != 0) {
-                                    prefetchExtension = Math.max(
-                                            prefetchExtension, index );
-                                }
-                            } else {
-                                // contract prefetch if dispatch required a pull
-                                if (getPrefetchSize() == 0) {
-                                    prefetchExtension = Math.max(0, prefetchExtension - index);
-                                }
+                        if (ack.getLastMessageId().equals(messageId)) {                  
+                            // contract prefetch if dispatch required a pull
+                            if (getPrefetchSize() == 0) {
+                                prefetchExtension = Math.max(0, prefetchExtension - index);
+                            } else if (context.isInTransaction()) {
+                                // extend prefetch window only if not a pulling consumer
+                                prefetchExtension = Math.max(prefetchExtension, index);
                             }
                             destination = node.getRegionDestination();
                             callDispatchMatched = true;

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java?rev=901271&r1=901270&r2=901271&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java Wed Jan 20 16:44:11 2010
@@ -155,15 +155,178 @@
         answer = (TextMessage)consumer2.receiveNoWait();
         assertNull("Should have not received a message!", answer);
     }
+    
+    // https://issues.apache.org/activemq/browse/AMQ-2567
+    public void testManyMessageConsumer() throws Exception {
+        doTestManyMessageConsumer(true);
+    }
+
+    public void testManyMessageConsumerNoTransaction() throws Exception {
+        doTestManyMessageConsumer(false);
+    }
+    
+    private void doTestManyMessageConsumer(boolean transacted) throws Exception {
+        Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Msg1"));
+        producer.send(session.createTextMessage("Msg2"));
+        producer.send(session.createTextMessage("Msg3"));
+        producer.send(session.createTextMessage("Msg4"));
+        producer.send(session.createTextMessage("Msg5"));
+        producer.send(session.createTextMessage("Msg6"));
+        producer.send(session.createTextMessage("Msg7"));
+        producer.send(session.createTextMessage("Msg8"));
+        if (transacted) {
+            session.commit();
+        }
+        // now lets receive it
+        MessageConsumer consumer = session.createConsumer(queue);
+        
+        MessageConsumer consumer2  = session.createConsumer(queue);
+        TextMessage answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg1");
+        if (transacted) {
+            session.commit();
+        }
+        answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg2");
+        if (transacted) {
+            session.commit();
+        }
+        answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg3");
+        if (transacted) {
+            session.commit();
+        }
+        // this call would return null if prefetchSize > 0
+        answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg4");
+        if (transacted) {
+            session.commit();
+        }
+        // Now using other consumer 
+        // this call should return the next message (Msg5) still left on the queue
+        answer = (TextMessage)consumer2.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg5");
+        if (transacted) {
+            session.commit();
+        }
+        // Now using other consumer 
+        // this call should return the next message (Msg5) still left on the queue
+        answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg6");
+        // read one more message without commit
+        // Now using other consumer 
+        // this call should return the next message (Msg5) still left on the queue
+        answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg7");
+        if (transacted) {
+            session.commit();
+        }
+        // Now using other consumer 
+        // this call should return the next message (Msg5) still left on the queue
+        answer = (TextMessage)consumer2.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg8");
+        if (transacted) {
+            session.commit();
+        }
+        answer = (TextMessage)consumer.receiveNoWait();
+        assertNull("Should have not received a message!", answer);
+    }
+
+    public void testManyMessageConsumerWithSend() throws Exception {
+        doTestManyMessageConsumerWithSend(true);
+    }
+
+    public void testManyMessageConsumerWithSendNoTransaction() throws Exception {
+        doTestManyMessageConsumerWithSend(false);
+    }
+    
+    private void doTestManyMessageConsumerWithSend(boolean transacted) throws Exception {
+        Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Msg1"));
+        producer.send(session.createTextMessage("Msg2"));
+        producer.send(session.createTextMessage("Msg3"));
+        producer.send(session.createTextMessage("Msg4"));
+        producer.send(session.createTextMessage("Msg5"));
+        producer.send(session.createTextMessage("Msg6"));
+        producer.send(session.createTextMessage("Msg7"));
+        producer.send(session.createTextMessage("Msg8"));
+        if (transacted) {
+            session.commit();
+        }
+        // now lets receive it
+        MessageConsumer consumer = session.createConsumer(queue);
+        
+        MessageConsumer consumer2  = session.createConsumer(queue);
+        TextMessage answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg1");
+        if (transacted) {
+            session.commit();
+        }
+        answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg2");
+        if (transacted) {
+            session.commit();
+        }
+        answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg3");
+        if (transacted) {
+            session.commit();
+        }
+        // Now using other consumer take 2
+        answer = (TextMessage)consumer2.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg4");
+        answer = (TextMessage)consumer2.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg5");
 
+        // ensure prefetch extension ok by sending another that could get dispatched
+        producer.send(session.createTextMessage("Msg9"));
+        if (transacted) {
+            session.commit();
+        }
+        
+        answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg6");
+        // read one more message without commit
+        // and using other consumer 
+        answer = (TextMessage)consumer2.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg7");
+        if (transacted) {
+            session.commit();
+        }
+        
+        answer = (TextMessage)consumer2.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg8");
+        if (transacted) {
+            session.commit();
+        }
+        
+        answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg9");
+        if (transacted) {
+            session.commit();
+        }
+        answer = (TextMessage)consumer.receiveNoWait();
+        assertNull("Should have not received a message!", answer);
+    }
+    
     protected void setUp() throws Exception {
-        bindAddress = "tcp://localhost:61616";
+        bindAddress = "tcp://localhost:0";
         super.setUp();
 
         connection = createConnection();
         connection.start();
         queue = createQueue();
     }
+    
+    protected void startBroker() throws Exception {
+        super.startBroker();
+        bindAddress = broker.getTransportConnectors().get(0).getConnectUri().toString();
+    }
 
     protected void tearDown() throws Exception {
         connection.close();