You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2008/12/16 12:02:29 UTC

svn commit: r727017 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/transport/stomp/ test/java/org/apache/activemq/transport/stomp/

Author: dejanb
Date: Tue Dec 16 03:02:28 2008
New Revision: 727017

URL: http://svn.apache.org/viewvc?rev=727017&view=rev
Log:
fix for prefetch size issue reported in AMQ-1807

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=727017&r1=727016&r2=727017&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Tue Dec 16 03:02:28 2008
@@ -243,11 +243,11 @@
                                 // consumer
                                 if (getPrefetchSize() != 0) {
                                     prefetchExtension = Math.max(
-                                            prefetchExtension, index + 1);
+                                            prefetchExtension, index );
                                 }
                             } else {
                                 prefetchExtension = Math.max(0,
-                                        prefetchExtension - (index + 1));
+                                        prefetchExtension - index);
                             }
                             destination = node.getRegionDestination();
                             callDispatchMatched = true;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=727017&r1=727016&r2=727017&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java Tue Dec 16 03:02:28 2008
@@ -112,6 +112,11 @@
     	headers.put("passcode", password);
     	StompFrame frame = new StompFrame("CONNECT", headers);
         sendFrame(frame.toString());
+        
+        StompFrame connect = receive();
+        if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
+        	throw new Exception ("Not connected: " + connect.getBody());
+        }
     }
     
     public void disconnect() throws Exception {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=727017&r1=727016&r2=727017&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Tue Dec 16 03:02:28 2008
@@ -884,7 +884,7 @@
         stompConnection.sendFrame(frame);
         // wait a bit for MBean to get refreshed
         try {
-        	Thread.sleep(100);
+        	Thread.sleep(200);
         } catch (InterruptedException e){}
         
         assertEquals(view.getDurableTopicSubscribers().length, 1);
@@ -892,7 +892,7 @@
         frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
         try {
-        	Thread.sleep(100);
+        	Thread.sleep(200);
         } catch (InterruptedException e){}
         
         //reconnect
@@ -920,17 +920,41 @@
         stompConnection.begin("tx1");
         stompConnection.send("/queue/" + getQueueName(), "msg", "tx1", null);
         stompConnection.commit("tx1");
-    	
-        StompFrame connect = stompConnection.receive();
-        if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
-        	throw new Exception ("Not connected");
-        }
         
         stompConnection.subscribe("/queue/" + getQueueName());
         StompFrame stompMessage = stompConnection.receive();
         assertNull(stompMessage.getHeaders().get("transaction"));      
     }
     
+    public void testPrefetchSize() throws Exception {
+        stompConnection.connect("system", "manager");
+        
+        HashMap<String, String> headers = new HashMap<String, String>();
+        headers.put("activemq.prefetchSize", "1");
+        stompConnection.subscribe("/queue/" + getQueueName(), "client", headers);
+
+        // send messages using JMS
+        sendMessage("message 1");
+        sendMessage("message 2");
+        sendMessage("message 3");
+        
+        StompFrame frame = stompConnection.receive();
+
+        stompConnection.begin("tx1");
+        stompConnection.ack(frame, "tx1");
+
+        StompFrame frame1 = stompConnection.receive();
+        
+        try {
+        	StompFrame frame2 = stompConnection.receive(500);
+        	if (frame2 != null) {
+        		fail("Should not have received the second message");
+        	}
+        } catch (SocketTimeoutException soe) {}
+        stompDisconnect();
+    	
+    }    
+    
     protected void assertClients(int expected) throws Exception {
         org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
         int actual = clients.length;