You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/07/17 16:43:16 UTC

svn commit: r1147639 - in /activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp: StompCodec.scala StompFrame.scala StompProtocolHandler.scala

Author: chirino
Date: Sun Jul 17 14:43:16 2011
New Revision: 1147639

URL: http://svn.apache.org/viewvc?rev=1147639&view=rev
Log:
Optimized the stomp codec flush call a bit.

Modified:
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1147639&r1=1147638&r2=1147639&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Sun Jul 17 14:43:16 2011
@@ -31,6 +31,7 @@ import _root_.org.fusesource.hawtbuf._
 import Buffer._
 import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.broker.store.{ZeroCopyBuffer, ZeroCopyBufferAllocator, MessageRecord}
+import org.apache.activemq.apollo.util.Log._
 
 object StompCodec extends Log {
 
@@ -252,43 +253,46 @@ class StompCodec extends ProtocolCodec {
 
 
   def flush():ProtocolCodec.BufferState = {
+    while(true) {
+      // if we have a pending write that is being sent over the socket...
+      if ( write_buffer.remaining() != 0 ) {
+        last_write_io_size = write_channel.write(write_buffer)
+        if ( last_write_io_size==0 )
+          return ProtocolCodec.BufferState.NOT_EMPTY
+        else
+          write_counter += last_write_io_size
+      } else {
+        if ( write_direct!=null ) {
+          last_write_io_size = write_direct.read(write_direct_pos, write_channel)
+          if ( last_write_io_size==0 )
+            return ProtocolCodec.BufferState.NOT_EMPTY
+          else {
+            write_direct_pos += last_write_io_size
+            write_counter += last_write_io_size
+
+            if( write_direct.remaining(write_direct_pos) == 0 ) {
+              write_direct.release
+              write_direct = null
+              write_direct_pos = 0
+              write_buffer = ByteBuffer.wrap(END_OF_FRAME_BUFFER.data)
+            }
+          }
+        } else {
+          if( next_write_buffer.size()==0 ) {
+            return ProtocolCodec.BufferState.EMPTY
+          } else {
+            // size of next buffer is based on how much was used in the previous buffer.
+            val prev_size = (write_buffer.position()+512).max(512).min(write_buffer_size)
+            write_buffer = next_write_buffer.toBuffer().toByteBuffer()
+            write_direct = next_write_direct
 
-    // if we have a pending write that is being sent over the socket...
-    if ( write_buffer.remaining() != 0 ) {
-      last_write_io_size = write_channel.write(write_buffer)
-      write_counter += last_write_io_size
-    }
-    if ( write_buffer.remaining() == 0 && write_direct!=null ) {
-      val count = write_direct.read(write_direct_pos, write_channel)
-      write_direct_pos += count
-      write_counter += count
-
-      if( write_direct.remaining(write_direct_pos) == 0 ) {
-        write_direct.release
-        write_direct = null
-        write_direct_pos = 0
-
-        write_buffer = ByteBuffer.wrap(END_OF_FRAME_BUFFER.data)
+            next_write_buffer = new DataByteArrayOutputStream(prev_size)
+            next_write_direct = null
+          }
+        }
       }
     }
-
-    // if it is now empty try to refill...
-    if ( is_empty && write_direct==null ) {
-        // size of next buffer is based on how much was used in the previous buffer.
-        val prev_size = (write_buffer.position()+512).max(512).min(write_buffer_size)
-        write_buffer = next_write_buffer.toBuffer().toByteBuffer()
-        write_direct = next_write_direct
-
-        next_write_buffer = new DataByteArrayOutputStream(prev_size)
-        next_write_direct = null
-    }
-
-    if ( is_empty ) {
-      ProtocolCodec.BufferState.EMPTY
-    } else {
-      ProtocolCodec.BufferState.NOT_EMPTY
-    }
-
+    ProtocolCodec.BufferState.NOT_EMPTY
   }
 
 
@@ -339,13 +343,15 @@ class StompCodec extends ProtocolCodec {
     while( command==null ) {
       // do we need to read in more data???
       if( read_direct!=null && read_direct.remaining(read_direct_pos) > 0) {
-        val count = read_direct.write(read_channel, read_direct_pos)
-        if (count == -1) {
+        last_read_io_size = read_direct.write(read_channel, read_direct_pos)
+
+        if (last_read_io_size == -1) {
             throw new EOFException("Peer disconnected")
-        } else if (count == 0) {
+        } else if (last_read_io_size == 0) {
             return null
         }
-        read_direct_pos += count
+        read_direct_pos += last_read_io_size
+        read_counter += last_read_io_size
       } else if (read_end == read_buffer.position() ) {
 
           // do we need a new data buffer to read data into??

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1147639&r1=1147638&r2=1147639&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Sun Jul 17 14:43:16 2011
@@ -179,6 +179,7 @@ object NilContent extends StompContent {
   def length = 0
   def writeTo(os:OutputStream) = {}
   val utf8 = new UTF8Buffer("")
+  override def toString = "NilContent"
 }
 
 /**

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1147639&r1=1147638&r2=1147639&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Sun Jul 17 14:43:16 2011
@@ -247,7 +247,7 @@ class StompProtocolHandler extends Proto
 
       def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
 
-        // session acks ack all previously recieved messages..
+        // session acks ack all previously received messages..
         var found = false
         val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
           if( found ) {
@@ -261,7 +261,7 @@ class StompProtocolHandler extends Proto
         }
 
         if( acked.isEmpty ) {
-//          println("ACK failed, invalid message id: %s".format(msgid))
+          println("%s: ACK failed, invalid message id: %s, dest: %s".format(security_context.remote_address, msgid, destination.mkString(",")))
         } else {
           consumer_acks = not_acked
           acked.foreach{case (id, delivery)=>
@@ -661,7 +661,7 @@ class StompProtocolHandler extends Proto
 
               case DISCONNECT =>
 
-                val delay = send_receipt(frame.headers)
+                val delay = send_receipt(frame.headers)!=null
                 on_transport_disconnected
                 if( delay ) {
                   queue.after(die_delay, TimeUnit.MILLISECONDS) {
@@ -1218,15 +1218,16 @@ class StompProtocolHandler extends Proto
   }
 
 
-  def send_receipt(headers:HeaderMap):Boolean = {
+  def send_receipt(headers:HeaderMap) = {
     get(headers, RECEIPT_REQUESTED) match {
       case Some(receipt)=>
+        val frame = StompFrame(RECEIPT, List((RECEIPT_ID, receipt)))
         dispatchQueue <<| ^{
-          connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
+          connection_sink.offer(frame)
         }
-        true
+        frame
       case None=>
-        false
+        null
     }
   }