You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/07/18 04:46:18 UTC
svn commit: r1147725 - in /activemq/activemq-apollo/trunk:
apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
pom.xml
Author: chirino
Date: Mon Jul 18 02:46:17 2011
New Revision: 1147725
URL: http://svn.apache.org/viewvc?rev=1147725&view=rev
Log:
Switch to hawtbuf 1.6 for safer methods for moving the head position in Buffer.
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/pom.xml
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1147725&r1=1147724&r2=1147725&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Mon Jul 18 02:46:17 2011
@@ -91,16 +91,16 @@ class StompProtocolHandler extends Proto
while( pos.offset < max ) {
if( pos.startsWith(ESCAPE_ESCAPE_SEQ) ) {
rc.write(ESCAPE)
- pos.offset += 2
+ pos.moveHead(2)
} else if( pos.startsWith(COLON_ESCAPE_SEQ) ) {
rc.write(COLON)
- pos.offset += 2
+ pos.moveHead(2)
} else if( pos.startsWith(NEWLINE_ESCAPE_SEQ) ) {
rc.write(NEWLINE)
- pos.offset += 2
+ pos.moveHead(2)
} else {
rc.write(pos.data(pos.offset))
- pos.offset += 1
+ pos.moveHead(1)
}
}
new String(rc.toByteArray, "UTF-8")
@@ -209,27 +209,23 @@ class StompProtocolHandler extends Proto
var consumer_acks = ListBuffer[(AsciiBuffer, TrackedAck)]()
def track(delivery:Delivery) = {
- queue.apply {
- if( protocol_version eq V1_0 ) {
- // register on the connection since 1.0 acks may not include the subscription id
- connection_ack_handlers += ( delivery.message.id-> this )
- }
- consumer_acks += delivery.message.id -> new TrackedAck(Some(delivery.size), delivery.ack )
+ queue.assertExecuting()
+ if( protocol_version eq V1_0 ) {
+ // register on the connection since 1.0 acks may not include the subscription id
+ connection_ack_handlers += ( delivery.message.id-> this )
}
+ consumer_acks += delivery.message.id -> new TrackedAck(Some(delivery.size), delivery.ack )
}
def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit = {
+ queue.assertExecuting()
if( initial_credit_window._3 ) {
var found = false
val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
- if( found ) {
- false
- } else {
- if( id == msgid ) {
- found = true
- }
- true
+ if( id == msgid ) {
+ found = true
}
+ found
}
for( (id, delivery) <- acked ) {
@@ -246,18 +242,15 @@ class StompProtocolHandler extends Proto
}
def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
+ queue.assertExecuting()
// session acks ack all previously received messages..
var found = false
val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
- if( found ) {
- false
- } else {
- if( id == msgid ) {
- found = true
- }
- true
+ if( id == msgid ) {
+ found = true
}
+ found
}
if( acked.isEmpty ) {
@@ -282,16 +275,16 @@ class StompProtocolHandler extends Proto
var consumer_acks = HashMap[AsciiBuffer, TrackedAck]()
def track(delivery:Delivery) = {
- queue.apply {
- if( protocol_version eq V1_0 ) {
- // register on the connection since 1.0 acks may not include the subscription id
- connection_ack_handlers += ( delivery.message.id-> this )
- }
- consumer_acks += delivery.message.id -> new TrackedAck(Some(delivery.size), delivery.ack)
+ queue.assertExecuting();
+ if( protocol_version eq V1_0 ) {
+ // register on the connection since 1.0 acks may not include the subscription id
+ connection_ack_handlers += ( delivery.message.id-> this )
}
+ consumer_acks += delivery.message.id -> new TrackedAck(Some(delivery.size), delivery.ack)
}
def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit = {
+ queue.assertExecuting()
if( initial_credit_window._3 ) {
for( delivery <- consumer_acks.get(msgid)) {
for( credit <- delivery.credit ) {
@@ -307,6 +300,7 @@ class StompProtocolHandler extends Proto
}
def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
+ queue.assertExecuting()
consumer_acks.remove(msgid) match {
case Some(delivery) =>
if( delivery.ack!=null ) {
Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1147725&r1=1147724&r2=1147725&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Mon Jul 18 02:46:17 2011
@@ -96,8 +96,8 @@
<xbean-version>3.4</xbean-version>
<felix-version>1.0.0</felix-version>
- <hawtbuf-version>1.5</hawtbuf-version>
<hawtdispatch-version>1.4-SNAPSHOT</hawtdispatch-version>
+ <hawtbuf-version>1.6-SNAPSHOT</hawtbuf-version>
<jdbm-version>2.0.1</jdbm-version>
<bdb-version>4.1.10</bdb-version>