You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/04/17 17:36:06 UTC

svn commit: r394710 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/stomp/Subscription.java test/java/org/apache/activemq/transport/stomp/StompTest.java

Author: chirino
Date: Mon Apr 17 08:36:04 2006
New Revision: 394710

URL: http://svn.apache.org/viewcvs?rev=394710&view=rev
Log:
Missing synchronization would cause acks to not be delivered to the broker.  After enough acks were missed, 
the consumer would stop receiving messages due to the broker thinking the consumers prefetch is full.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscription.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscription.java?rev=394710&r1=394709&r2=394710&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscription.java Mon Apr 17 08:36:04 2006
@@ -93,7 +93,7 @@
         out.write(builder.toFrame());
     }
 
-    private void addMessageDispatch(MessageDispatch md) {
+    synchronized private void addMessageDispatch(MessageDispatch md) {
         dispatchedMessages.addLast(md);
     }
 
@@ -117,7 +117,7 @@
         return subscriptionId;
     }
 
-    public MessageAck createMessageAck(String message_id) {
+    synchronized public MessageAck createMessageAck(String message_id) {
         MessageAck ack = new MessageAck();
         ack.setDestination(consumerInfo.getDestination());
         ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
@@ -136,6 +136,7 @@
             count++;
             if( id.equals(message_id)  ) {
                 ack.setLastMessageId(md.getMessage().getMessageId());
+                break;
             }
         }
         ack.setMessageCount(count);

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=394710&r1=394709&r2=394710&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Mon Apr 17 08:36:04 2006
@@ -96,6 +96,8 @@
             if( c < 0 ) {
                 throw new IOException("socket closed.");
             } else if( c == 0 ) {
+                c = is.read();
+                assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
                 byte[] ba = inputBuffer.toByteArray();
                 inputBuffer.reset();
                 return new String(ba, "UTF-8");