You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/29 15:10:56 UTC

svn commit: r1403303 - in /activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp: StompCodec.scala StompFrame.scala StompProtocolHandler.scala

Author: chirino
Date: Mon Oct 29 14:10:56 2012
New Revision: 1403303

URL: http://svn.apache.org/viewvc?rev=1403303&view=rev
Log:
Fixes APLO-266: Stomp 1.2 with CRLF, sometimes gets: IOException: Unable to parse header line [\u0013]

Modified:
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1403303&r1=1403302&r2=1403303&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Mon Oct 29 14:10:56 2012
@@ -63,7 +63,7 @@ object StompCodec extends Log {
     }
 
     // we can optimize a little if the headers and content are in the same buffer..
-    if( frame.are_headers_in_content_buffer ) {
+    if( frame.are_headers_in_content_buffer && frame.contiguous ) {
 
       val offset = frame.headers.head._1.offset;
       val buffer1 = frame.headers.head._1;
@@ -152,9 +152,9 @@ object StompCodec extends Log {
     }
 
     if( direct_buffer==null ) {
-      new StompFrame(action, headers.toList, BufferContent(buffer))
+      new StompFrame(action, headers.toList, BufferContent(buffer), true)
     } else {
-      new StompFrame(action, headers.toList, ZeroCopyContent(direct_buffer))
+      new StompFrame(action, headers.toList, ZeroCopyContent(direct_buffer), true)
     }
   }
 
@@ -188,7 +188,7 @@ class StompCodec extends AbstractProtoco
     }
 
     // we can optimize a little if the headers and content are in the same buffer..
-    if( frame.are_headers_in_content_buffer ) {
+    if( frame.are_headers_in_content_buffer && frame.contiguous) {
 
       val offset = frame.headers.head._1.offset;
       val buffer1 = frame.headers.head._1;
@@ -229,13 +229,15 @@ class StompCodec extends AbstractProtoco
       var line = readUntil(NEWLINE, max_command_length, "The maximum command length was exceeded")
       if (line != null) {
         var action = line.moveTail(-1)
+        var contiguous = true
         if (trim) {
           action = action.trim
         } else if( trim_cr && action.length > 0 && action.get(action.length-1)=='\r'.toByte ) {
           action.moveTail(-1)
+          contiguous = false
         }
         if (action.length > 0) {
-          nextDecodeAction = read_headers(action.ascii)
+          nextDecodeAction = read_headers(action.ascii, contiguous)
           return nextDecodeAction();
         }
       }
@@ -243,9 +245,10 @@ class StompCodec extends AbstractProtoco
     }
   }
 
-  private def read_headers(command: AsciiBuffer): AbstractProtocolCodec.Action = new AbstractProtocolCodec.Action {
+  private def read_headers(command: AsciiBuffer, c:Boolean): AbstractProtocolCodec.Action = new AbstractProtocolCodec.Action {
     var contentLength:AsciiBuffer = _
     val headers = new ListBuffer[(AsciiBuffer, AsciiBuffer)]()
+    var contiguous = c;
 
     def apply: AnyRef = {
       var line = readUntil(NEWLINE, max_header_length, "The maximum header length was exceeded")
@@ -255,6 +258,7 @@ class StompCodec extends AbstractProtoco
         line.moveTail(-1)
         // 1.0 and 1.2 spec trims off the \r
         if ( (trim || trim_cr) && line.length > 0 && line.get(line.length-1)=='\r'.toByte ) {
+          contiguous = false
           line.moveTail(-1)
         }
 
@@ -297,9 +301,9 @@ class StompCodec extends AbstractProtoco
             if (max_data_length != -1 && length > max_data_length) {
               throw new IOException("The maximum data length was exceeded")
             }
-            nextDecodeAction = read_binary_body(command, h, length)
+            nextDecodeAction = read_binary_body(command, h, length, contiguous)
           } else {
-            nextDecodeAction = read_text_body(command, h)
+            nextDecodeAction = read_text_body(command, h, contiguous)
           }
           return nextDecodeAction.apply()
         }
@@ -308,7 +312,7 @@ class StompCodec extends AbstractProtoco
     }
   }
 
-  private def read_binary_body(command: AsciiBuffer, headers:HeaderMap, contentLength: Int): AbstractProtocolCodec.Action = {
+  private def read_binary_body(command: AsciiBuffer, headers:HeaderMap, contentLength: Int, contiguous:Boolean): AbstractProtocolCodec.Action = {
     return new AbstractProtocolCodec.Action {
       def apply: AnyRef = {
         var content = readBytes(contentLength + 1)
@@ -319,7 +323,7 @@ class StompCodec extends AbstractProtoco
           nextDecodeAction = read_action
           content.moveTail(-1)
           val body = if( content.length() == 0) NilContent else BufferContent(content)
-          return new StompFrame(command, headers, body)
+          return new StompFrame(command, headers, body, contiguous)
         }
         else {
           return null
@@ -328,7 +332,7 @@ class StompCodec extends AbstractProtoco
     }
   }
 
-  private def read_text_body(command: AsciiBuffer, headers:HeaderMap): AbstractProtocolCodec.Action = {
+  private def read_text_body(command: AsciiBuffer, headers:HeaderMap, contiguous:Boolean): AbstractProtocolCodec.Action = {
     return new AbstractProtocolCodec.Action {
       def apply: AnyRef = {
         var content: Buffer = readUntil(0.asInstanceOf[Byte])
@@ -336,7 +340,7 @@ class StompCodec extends AbstractProtoco
           nextDecodeAction = read_action
           content.moveTail(-1)
           val body = if( content.length() == 0) NilContent else BufferContent(content)
-          return new StompFrame(command, headers, body)
+          return new StompFrame(command, headers, body, contiguous)
         }
         else {
           return null

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1403303&r1=1403302&r2=1403303&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Mon Oct 29 14:10:56 2012
@@ -229,7 +229,7 @@ case class ZeroCopyContent(zero_copy_buf
  *
  * @author <a href="http://hiramchirino.com">chirino</a>
  */
-case class StompFrame(action:AsciiBuffer, headers:HeaderMap=Nil, content:StompContent=NilContent, updated_headers:HeaderMap=Nil) {
+case class StompFrame(action:AsciiBuffer, headers:HeaderMap=Nil, content:StompContent=NilContent, contiguous:Boolean=false, updated_headers:HeaderMap=Nil) {
 
   def size_of_updated_headers = {
     size_of(updated_headers)
@@ -267,17 +267,18 @@ case class StompFrame(action:AsciiBuffer
           ( headers.head._1.data eq content.asInstanceOf[BufferContent].content.data )
 
   def size:Int = {
-     content match {
-       case x:BufferContent =>
-         if( (action.data eq x.content.data) && updated_headers==Nil ) {
-            return (x.content.offset-action.offset)+x.content.length
-         }
-       case _ =>
-     }
-
-     action.length + 1 +
-     size_of_updated_headers +
-     size_of_original_headers + 1 + content.length
+    if( contiguous ) {
+      content match {
+        case x:BufferContent =>
+          if( (action.data eq x.content.data) && updated_headers==Nil ) {
+             return (x.content.offset-action.offset)+x.content.length
+          }
+        case _ =>
+      }
+    }
+    action.length + 1 +
+    size_of_updated_headers +
+    size_of_original_headers + 1 + content.length
   }
 
   def header(name:AsciiBuffer) = {
@@ -286,7 +287,7 @@ case class StompFrame(action:AsciiBuffer
     ).map(_._2).getOrElse(null)
   }
 
-  def append_headers(value:HeaderMap) = StompFrame(action, headers, content, value ::: updated_headers)
+  def append_headers(value:HeaderMap) = StompFrame(action, headers, content, contiguous, value ::: updated_headers)
 
   def retain = content.retain
   def release = content.release

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1403303&r1=1403302&r2=1403303&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Mon Oct 29 14:10:56 2012
@@ -1300,9 +1300,9 @@ class StompProtocolHandler extends Proto
       // We may need to add some headers..
       var message = updated_headers(addresses, frame.headers) match {
         case Nil=>
-          StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content))
+          StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content, frame.contiguous))
         case updated_headers =>
-          StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content, updated_headers))
+          StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content, frame.contiguous, updated_headers))
       }
 
       val delivery = new Delivery