You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/07/18 04:46:18 UTC

svn commit: r1147725 - in /activemq/activemq-apollo/trunk: apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala pom.xml

Author: chirino
Date: Mon Jul 18 02:46:17 2011
New Revision: 1147725

URL: http://svn.apache.org/viewvc?rev=1147725&view=rev
Log:
Switch to hawtbuf 1.6 for safer methods for moving the head position in Buffer.

Modified:
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/pom.xml

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1147725&r1=1147724&r2=1147725&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Mon Jul 18 02:46:17 2011
@@ -91,16 +91,16 @@ class StompProtocolHandler extends Proto
     while( pos.offset < max ) {
       if( pos.startsWith(ESCAPE_ESCAPE_SEQ) ) {
         rc.write(ESCAPE)
-        pos.offset += 2
+        pos.moveHead(2)
       } else if( pos.startsWith(COLON_ESCAPE_SEQ) ) {
         rc.write(COLON)
-        pos.offset += 2
+        pos.moveHead(2)
       } else if( pos.startsWith(NEWLINE_ESCAPE_SEQ) ) {
         rc.write(NEWLINE)
-        pos.offset += 2
+        pos.moveHead(2)
       } else {
         rc.write(pos.data(pos.offset))
-        pos.offset += 1
+        pos.moveHead(1)
       }
     }
     new String(rc.toByteArray, "UTF-8")
@@ -209,27 +209,23 @@ class StompProtocolHandler extends Proto
       var consumer_acks = ListBuffer[(AsciiBuffer, TrackedAck)]()
 
       def track(delivery:Delivery) = {
-        queue.apply {
-          if( protocol_version eq V1_0 ) {
-            // register on the connection since 1.0 acks may not include the subscription id
-            connection_ack_handlers += ( delivery.message.id-> this )
-          }
-          consumer_acks += delivery.message.id -> new TrackedAck(Some(delivery.size), delivery.ack )
+        queue.assertExecuting()
+        if( protocol_version eq V1_0 ) {
+          // register on the connection since 1.0 acks may not include the subscription id
+          connection_ack_handlers += ( delivery.message.id-> this )
         }
+        consumer_acks += delivery.message.id -> new TrackedAck(Some(delivery.size), delivery.ack )
       }
 
       def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit = {
+        queue.assertExecuting()
         if( initial_credit_window._3 ) {
           var found = false
           val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
-            if( found ) {
-              false
-            } else {
-              if( id == msgid ) {
-                found = true
-              }
-              true
+            if( id == msgid ) {
+              found = true
             }
+            found
           }
 
           for( (id, delivery) <- acked ) {
@@ -246,18 +242,15 @@ class StompProtocolHandler extends Proto
       }
 
       def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
+        queue.assertExecuting()
 
         // session acks ack all previously received messages..
         var found = false
         val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
-          if( found ) {
-            false
-          } else {
-            if( id == msgid ) {
-              found = true
-            }
-            true
+          if( id == msgid ) {
+            found = true
           }
+          found
         }
 
         if( acked.isEmpty ) {
@@ -282,16 +275,16 @@ class StompProtocolHandler extends Proto
       var consumer_acks = HashMap[AsciiBuffer, TrackedAck]()
 
       def track(delivery:Delivery) = {
-        queue.apply {
-          if( protocol_version eq V1_0 ) {
-            // register on the connection since 1.0 acks may not include the subscription id
-            connection_ack_handlers += ( delivery.message.id-> this )
-          }
-          consumer_acks += delivery.message.id -> new TrackedAck(Some(delivery.size), delivery.ack)
+        queue.assertExecuting();
+        if( protocol_version eq V1_0 ) {
+          // register on the connection since 1.0 acks may not include the subscription id
+          connection_ack_handlers += ( delivery.message.id-> this )
         }
+        consumer_acks += delivery.message.id -> new TrackedAck(Some(delivery.size), delivery.ack)
       }
 
       def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit = {
+        queue.assertExecuting()
         if( initial_credit_window._3 ) {
           for( delivery <- consumer_acks.get(msgid)) {
             for( credit <- delivery.credit ) {
@@ -307,6 +300,7 @@ class StompProtocolHandler extends Proto
       }
 
       def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
+        queue.assertExecuting()
         consumer_acks.remove(msgid) match {
           case Some(delivery) =>
             if( delivery.ack!=null ) {

Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1147725&r1=1147724&r2=1147725&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Mon Jul 18 02:46:17 2011
@@ -96,8 +96,8 @@
     <xbean-version>3.4</xbean-version>
     <felix-version>1.0.0</felix-version>
 
-    <hawtbuf-version>1.5</hawtbuf-version>
     <hawtdispatch-version>1.4-SNAPSHOT</hawtdispatch-version>
+    <hawtbuf-version>1.6-SNAPSHOT</hawtbuf-version>
     
     <jdbm-version>2.0.1</jdbm-version>
     <bdb-version>4.1.10</bdb-version>