You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2008/12/16 12:02:29 UTC
svn commit: r727017 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/transport/stomp/
test/java/org/apache/activemq/transport/stomp/
Author: dejanb
Date: Tue Dec 16 03:02:28 2008
New Revision: 727017
URL: http://svn.apache.org/viewvc?rev=727017&view=rev
Log:
fix for prefetch size issue reported in AMQ-1807
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=727017&r1=727016&r2=727017&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Tue Dec 16 03:02:28 2008
@@ -243,11 +243,11 @@
// consumer
if (getPrefetchSize() != 0) {
prefetchExtension = Math.max(
- prefetchExtension, index + 1);
+ prefetchExtension, index );
}
} else {
prefetchExtension = Math.max(0,
- prefetchExtension - (index + 1));
+ prefetchExtension - index);
}
destination = node.getRegionDestination();
callDispatchMatched = true;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=727017&r1=727016&r2=727017&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java Tue Dec 16 03:02:28 2008
@@ -112,6 +112,11 @@
headers.put("passcode", password);
StompFrame frame = new StompFrame("CONNECT", headers);
sendFrame(frame.toString());
+
+ StompFrame connect = receive();
+ if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
+ throw new Exception ("Not connected: " + connect.getBody());
+ }
}
public void disconnect() throws Exception {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=727017&r1=727016&r2=727017&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Tue Dec 16 03:02:28 2008
@@ -884,7 +884,7 @@
stompConnection.sendFrame(frame);
// wait a bit for MBean to get refreshed
try {
- Thread.sleep(100);
+ Thread.sleep(200);
} catch (InterruptedException e){}
assertEquals(view.getDurableTopicSubscribers().length, 1);
@@ -892,7 +892,7 @@
frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
try {
- Thread.sleep(100);
+ Thread.sleep(200);
} catch (InterruptedException e){}
//reconnect
@@ -920,17 +920,41 @@
stompConnection.begin("tx1");
stompConnection.send("/queue/" + getQueueName(), "msg", "tx1", null);
stompConnection.commit("tx1");
-
- StompFrame connect = stompConnection.receive();
- if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
- throw new Exception ("Not connected");
- }
stompConnection.subscribe("/queue/" + getQueueName());
StompFrame stompMessage = stompConnection.receive();
assertNull(stompMessage.getHeaders().get("transaction"));
}
+ public void testPrefetchSize() throws Exception {
+ stompConnection.connect("system", "manager");
+
+ HashMap<String, String> headers = new HashMap<String, String>();
+ headers.put("activemq.prefetchSize", "1");
+ stompConnection.subscribe("/queue/" + getQueueName(), "client", headers);
+
+ // send messages using JMS
+ sendMessage("message 1");
+ sendMessage("message 2");
+ sendMessage("message 3");
+
+ StompFrame frame = stompConnection.receive();
+
+ stompConnection.begin("tx1");
+ stompConnection.ack(frame, "tx1");
+
+ StompFrame frame1 = stompConnection.receive();
+
+ try {
+ StompFrame frame2 = stompConnection.receive(500);
+ if (frame2 != null) {
+ fail("Should not have received the second message");
+ }
+ } catch (SocketTimeoutException soe) {}
+ stompDisconnect();
+
+ }
+
protected void assertClients(int expected) throws Exception {
org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
int actual = clients.length;