You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/29 15:10:56 UTC
svn commit: r1403303 - in
/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp:
StompCodec.scala StompFrame.scala StompProtocolHandler.scala
Author: chirino
Date: Mon Oct 29 14:10:56 2012
New Revision: 1403303
URL: http://svn.apache.org/viewvc?rev=1403303&view=rev
Log:
Fixes APLO-266: Stomp 1.2 with CRLF, sometimes gets: IOException: Unable to parse header line [\u0013]
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1403303&r1=1403302&r2=1403303&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Mon Oct 29 14:10:56 2012
@@ -63,7 +63,7 @@ object StompCodec extends Log {
}
// we can optimize a little if the headers and content are in the same buffer..
- if( frame.are_headers_in_content_buffer ) {
+ if( frame.are_headers_in_content_buffer && frame.contiguous ) {
val offset = frame.headers.head._1.offset;
val buffer1 = frame.headers.head._1;
@@ -152,9 +152,9 @@ object StompCodec extends Log {
}
if( direct_buffer==null ) {
- new StompFrame(action, headers.toList, BufferContent(buffer))
+ new StompFrame(action, headers.toList, BufferContent(buffer), true)
} else {
- new StompFrame(action, headers.toList, ZeroCopyContent(direct_buffer))
+ new StompFrame(action, headers.toList, ZeroCopyContent(direct_buffer), true)
}
}
@@ -188,7 +188,7 @@ class StompCodec extends AbstractProtoco
}
// we can optimize a little if the headers and content are in the same buffer..
- if( frame.are_headers_in_content_buffer ) {
+ if( frame.are_headers_in_content_buffer && frame.contiguous) {
val offset = frame.headers.head._1.offset;
val buffer1 = frame.headers.head._1;
@@ -229,13 +229,15 @@ class StompCodec extends AbstractProtoco
var line = readUntil(NEWLINE, max_command_length, "The maximum command length was exceeded")
if (line != null) {
var action = line.moveTail(-1)
+ var contiguous = true
if (trim) {
action = action.trim
} else if( trim_cr && action.length > 0 && action.get(action.length-1)=='\r'.toByte ) {
action.moveTail(-1)
+ contiguous = false
}
if (action.length > 0) {
- nextDecodeAction = read_headers(action.ascii)
+ nextDecodeAction = read_headers(action.ascii, contiguous)
return nextDecodeAction();
}
}
@@ -243,9 +245,10 @@ class StompCodec extends AbstractProtoco
}
}
- private def read_headers(command: AsciiBuffer): AbstractProtocolCodec.Action = new AbstractProtocolCodec.Action {
+ private def read_headers(command: AsciiBuffer, c:Boolean): AbstractProtocolCodec.Action = new AbstractProtocolCodec.Action {
var contentLength:AsciiBuffer = _
val headers = new ListBuffer[(AsciiBuffer, AsciiBuffer)]()
+ var contiguous = c;
def apply: AnyRef = {
var line = readUntil(NEWLINE, max_header_length, "The maximum header length was exceeded")
@@ -255,6 +258,7 @@ class StompCodec extends AbstractProtoco
line.moveTail(-1)
// 1.0 and 1.2 spec trims off the \r
if ( (trim || trim_cr) && line.length > 0 && line.get(line.length-1)=='\r'.toByte ) {
+ contiguous = false
line.moveTail(-1)
}
@@ -297,9 +301,9 @@ class StompCodec extends AbstractProtoco
if (max_data_length != -1 && length > max_data_length) {
throw new IOException("The maximum data length was exceeded")
}
- nextDecodeAction = read_binary_body(command, h, length)
+ nextDecodeAction = read_binary_body(command, h, length, contiguous)
} else {
- nextDecodeAction = read_text_body(command, h)
+ nextDecodeAction = read_text_body(command, h, contiguous)
}
return nextDecodeAction.apply()
}
@@ -308,7 +312,7 @@ class StompCodec extends AbstractProtoco
}
}
- private def read_binary_body(command: AsciiBuffer, headers:HeaderMap, contentLength: Int): AbstractProtocolCodec.Action = {
+ private def read_binary_body(command: AsciiBuffer, headers:HeaderMap, contentLength: Int, contiguous:Boolean): AbstractProtocolCodec.Action = {
return new AbstractProtocolCodec.Action {
def apply: AnyRef = {
var content = readBytes(contentLength + 1)
@@ -319,7 +323,7 @@ class StompCodec extends AbstractProtoco
nextDecodeAction = read_action
content.moveTail(-1)
val body = if( content.length() == 0) NilContent else BufferContent(content)
- return new StompFrame(command, headers, body)
+ return new StompFrame(command, headers, body, contiguous)
}
else {
return null
@@ -328,7 +332,7 @@ class StompCodec extends AbstractProtoco
}
}
- private def read_text_body(command: AsciiBuffer, headers:HeaderMap): AbstractProtocolCodec.Action = {
+ private def read_text_body(command: AsciiBuffer, headers:HeaderMap, contiguous:Boolean): AbstractProtocolCodec.Action = {
return new AbstractProtocolCodec.Action {
def apply: AnyRef = {
var content: Buffer = readUntil(0.asInstanceOf[Byte])
@@ -336,7 +340,7 @@ class StompCodec extends AbstractProtoco
nextDecodeAction = read_action
content.moveTail(-1)
val body = if( content.length() == 0) NilContent else BufferContent(content)
- return new StompFrame(command, headers, body)
+ return new StompFrame(command, headers, body, contiguous)
}
else {
return null
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1403303&r1=1403302&r2=1403303&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Mon Oct 29 14:10:56 2012
@@ -229,7 +229,7 @@ case class ZeroCopyContent(zero_copy_buf
*
* @author <a href="http://hiramchirino.com">chirino</a>
*/
-case class StompFrame(action:AsciiBuffer, headers:HeaderMap=Nil, content:StompContent=NilContent, updated_headers:HeaderMap=Nil) {
+case class StompFrame(action:AsciiBuffer, headers:HeaderMap=Nil, content:StompContent=NilContent, contiguous:Boolean=false, updated_headers:HeaderMap=Nil) {
def size_of_updated_headers = {
size_of(updated_headers)
@@ -267,17 +267,18 @@ case class StompFrame(action:AsciiBuffer
( headers.head._1.data eq content.asInstanceOf[BufferContent].content.data )
def size:Int = {
- content match {
- case x:BufferContent =>
- if( (action.data eq x.content.data) && updated_headers==Nil ) {
- return (x.content.offset-action.offset)+x.content.length
- }
- case _ =>
- }
-
- action.length + 1 +
- size_of_updated_headers +
- size_of_original_headers + 1 + content.length
+ if( contiguous ) {
+ content match {
+ case x:BufferContent =>
+ if( (action.data eq x.content.data) && updated_headers==Nil ) {
+ return (x.content.offset-action.offset)+x.content.length
+ }
+ case _ =>
+ }
+ }
+ action.length + 1 +
+ size_of_updated_headers +
+ size_of_original_headers + 1 + content.length
}
def header(name:AsciiBuffer) = {
@@ -286,7 +287,7 @@ case class StompFrame(action:AsciiBuffer
).map(_._2).getOrElse(null)
}
- def append_headers(value:HeaderMap) = StompFrame(action, headers, content, value ::: updated_headers)
+ def append_headers(value:HeaderMap) = StompFrame(action, headers, content, contiguous, value ::: updated_headers)
def retain = content.retain
def release = content.release
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1403303&r1=1403302&r2=1403303&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Mon Oct 29 14:10:56 2012
@@ -1300,9 +1300,9 @@ class StompProtocolHandler extends Proto
// We may need to add some headers..
var message = updated_headers(addresses, frame.headers) match {
case Nil=>
- StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content))
+ StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content, frame.contiguous))
case updated_headers =>
- StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content, updated_headers))
+ StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content, frame.contiguous, updated_headers))
}
val delivery = new Delivery