You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/07/17 16:43:16 UTC
svn commit: r1147639 - in
/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp:
StompCodec.scala StompFrame.scala StompProtocolHandler.scala
Author: chirino
Date: Sun Jul 17 14:43:16 2011
New Revision: 1147639
URL: http://svn.apache.org/viewvc?rev=1147639&view=rev
Log:
Optimized the stomp codec flush call a bit.
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1147639&r1=1147638&r2=1147639&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Sun Jul 17 14:43:16 2011
@@ -31,6 +31,7 @@ import _root_.org.fusesource.hawtbuf._
import Buffer._
import org.apache.activemq.apollo.util._
import org.apache.activemq.apollo.broker.store.{ZeroCopyBuffer, ZeroCopyBufferAllocator, MessageRecord}
+import org.apache.activemq.apollo.util.Log._
object StompCodec extends Log {
@@ -252,43 +253,46 @@ class StompCodec extends ProtocolCodec {
def flush():ProtocolCodec.BufferState = {
+ while(true) {
+ // if we have a pending write that is being sent over the socket...
+ if ( write_buffer.remaining() != 0 ) {
+ last_write_io_size = write_channel.write(write_buffer)
+ if ( last_write_io_size==0 )
+ return ProtocolCodec.BufferState.NOT_EMPTY
+ else
+ write_counter += last_write_io_size
+ } else {
+ if ( write_direct!=null ) {
+ last_write_io_size = write_direct.read(write_direct_pos, write_channel)
+ if ( last_write_io_size==0 )
+ return ProtocolCodec.BufferState.NOT_EMPTY
+ else {
+ write_direct_pos += last_write_io_size
+ write_counter += last_write_io_size
+
+ if( write_direct.remaining(write_direct_pos) == 0 ) {
+ write_direct.release
+ write_direct = null
+ write_direct_pos = 0
+ write_buffer = ByteBuffer.wrap(END_OF_FRAME_BUFFER.data)
+ }
+ }
+ } else {
+ if( next_write_buffer.size()==0 ) {
+ return ProtocolCodec.BufferState.EMPTY
+ } else {
+ // size of next buffer is based on how much was used in the previous buffer.
+ val prev_size = (write_buffer.position()+512).max(512).min(write_buffer_size)
+ write_buffer = next_write_buffer.toBuffer().toByteBuffer()
+ write_direct = next_write_direct
- // if we have a pending write that is being sent over the socket...
- if ( write_buffer.remaining() != 0 ) {
- last_write_io_size = write_channel.write(write_buffer)
- write_counter += last_write_io_size
- }
- if ( write_buffer.remaining() == 0 && write_direct!=null ) {
- val count = write_direct.read(write_direct_pos, write_channel)
- write_direct_pos += count
- write_counter += count
-
- if( write_direct.remaining(write_direct_pos) == 0 ) {
- write_direct.release
- write_direct = null
- write_direct_pos = 0
-
- write_buffer = ByteBuffer.wrap(END_OF_FRAME_BUFFER.data)
+ next_write_buffer = new DataByteArrayOutputStream(prev_size)
+ next_write_direct = null
+ }
+ }
}
}
-
- // if it is now empty try to refill...
- if ( is_empty && write_direct==null ) {
- // size of next buffer is based on how much was used in the previous buffer.
- val prev_size = (write_buffer.position()+512).max(512).min(write_buffer_size)
- write_buffer = next_write_buffer.toBuffer().toByteBuffer()
- write_direct = next_write_direct
-
- next_write_buffer = new DataByteArrayOutputStream(prev_size)
- next_write_direct = null
- }
-
- if ( is_empty ) {
- ProtocolCodec.BufferState.EMPTY
- } else {
- ProtocolCodec.BufferState.NOT_EMPTY
- }
-
+ ProtocolCodec.BufferState.NOT_EMPTY
}
@@ -339,13 +343,15 @@ class StompCodec extends ProtocolCodec {
while( command==null ) {
// do we need to read in more data???
if( read_direct!=null && read_direct.remaining(read_direct_pos) > 0) {
- val count = read_direct.write(read_channel, read_direct_pos)
- if (count == -1) {
+ last_read_io_size = read_direct.write(read_channel, read_direct_pos)
+
+ if (last_read_io_size == -1) {
throw new EOFException("Peer disconnected")
- } else if (count == 0) {
+ } else if (last_read_io_size == 0) {
return null
}
- read_direct_pos += count
+ read_direct_pos += last_read_io_size
+ read_counter += last_read_io_size
} else if (read_end == read_buffer.position() ) {
// do we need a new data buffer to read data into??
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1147639&r1=1147638&r2=1147639&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Sun Jul 17 14:43:16 2011
@@ -179,6 +179,7 @@ object NilContent extends StompContent {
def length = 0
def writeTo(os:OutputStream) = {}
val utf8 = new UTF8Buffer("")
+ override def toString = "NilContent"
}
/**
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1147639&r1=1147638&r2=1147639&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Sun Jul 17 14:43:16 2011
@@ -247,7 +247,7 @@ class StompProtocolHandler extends Proto
def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
- // session acks ack all previously recieved messages..
+ // session acks ack all previously received messages..
var found = false
val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
if( found ) {
@@ -261,7 +261,7 @@ class StompProtocolHandler extends Proto
}
if( acked.isEmpty ) {
-// println("ACK failed, invalid message id: %s".format(msgid))
+ println("%s: ACK failed, invalid message id: %s, dest: %s".format(security_context.remote_address, msgid, destination.mkString(",")))
} else {
consumer_acks = not_acked
acked.foreach{case (id, delivery)=>
@@ -661,7 +661,7 @@ class StompProtocolHandler extends Proto
case DISCONNECT =>
- val delay = send_receipt(frame.headers)
+ val delay = send_receipt(frame.headers)!=null
on_transport_disconnected
if( delay ) {
queue.after(die_delay, TimeUnit.MILLISECONDS) {
@@ -1218,15 +1218,16 @@ class StompProtocolHandler extends Proto
}
- def send_receipt(headers:HeaderMap):Boolean = {
+ def send_receipt(headers:HeaderMap) = {
get(headers, RECEIPT_REQUESTED) match {
case Some(receipt)=>
+ val frame = StompFrame(RECEIPT, List((RECEIPT_ID, receipt)))
dispatchQueue <<| ^{
- connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
+ connection_sink.offer(frame)
}
- true
+ frame
case None=>
- false
+ null
}
}