You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/08/29 00:17:45 UTC
svn commit: r1378366 - in /activemq/activemq-apollo/trunk/apollo-stomp/src:
main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
Author: chirino
Date: Tue Aug 28 22:17:44 2012
New Revision: 1378366
URL: http://svn.apache.org/viewvc?rev=1378366&view=rev
Log:
Fixes APLO-252: Spurious bytes added by the STOMP UDP Protocol
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1378366&r1=1378365&r2=1378366&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Tue Aug 28 22:17:44 2012
@@ -88,10 +88,10 @@ object StompCodec extends Log {
}
def decode(message: MessageRecord):StompFrameMessage = {
- new StompFrameMessage(decode_frame(message.buffer, message.direct_buffer))
+ new StompFrameMessage(decode_frame(message.buffer, message.direct_buffer, false))
}
- def decode_frame(buffer: Buffer, direct_buffer:DirectBuffer=null):StompFrame = {
+ def decode_frame(buffer: Buffer, direct_buffer:DirectBuffer=null, end_check:Boolean=true):StompFrame = {
def read_line = {
val pos = buffer.indexOf('\n'.toByte)
if( pos<0 ) {
@@ -108,6 +108,7 @@ object StompCodec extends Log {
val action = read_line
val headers = new HeaderMapBuffer()
+ var contentLength:AsciiBuffer = null
var line = read_line
while( line.length() > 0 ) {
@@ -119,6 +120,9 @@ object StompCodec extends Log {
var name = line.slice(0, seperatorIndex)
var value = line.slice(seperatorIndex + 1, line.length)
headers.add((name, value))
+ if (end_check && contentLength==null && name == CONTENT_LENGTH ) {
+ contentLength = value
+ }
} catch {
case e:Exception=>
throw new IOException("Unable to parser header line [" + line + "]")
@@ -126,6 +130,27 @@ object StompCodec extends Log {
line = read_line
}
+ if ( end_check ) {
+ buffer.length = if (contentLength != null) {
+ val length = try {
+ contentLength.toString.toInt
+ } catch {
+ case e: NumberFormatException =>
+ throw new IOException("Specified content-length is not a valid integer")
+ }
+ if( length > buffer.length ) {
+ throw new IOException("Frame did not contain enough bytes to satisfy the content-length")
+ }
+ length
+ } else {
+ val pos = buffer.indexOf(0.toByte)
+ if( pos < 0 ) {
+ throw new IOException("Frame is not null terminated")
+ }
+ pos
+ }
+ }
+
if( direct_buffer==null ) {
new StompFrame(action, headers.toList, BufferContent(buffer))
} else {
@@ -241,7 +266,7 @@ class StompCodec extends AbstractProtoco
value = value.trim
}
var entry = (name.ascii, value.ascii)
- if (entry._1 == CONTENT_LENGTH && contentLength==null) {
+ if (contentLength==null && entry._1 == CONTENT_LENGTH) {
contentLength = entry._2
}
headers.add(entry)
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1378366&r1=1378365&r2=1378366&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala Tue Aug 28 22:17:44 2012
@@ -167,6 +167,8 @@ class StompParallelTest extends StompTes
channel.send(new AsciiBuffer("Hello").toByteBuffer, target)
assert_received("Hello")
+ channel.send(new AsciiBuffer("World").toByteBuffer, target)
+ assert_received("World")
}
test("STOMP UDP to STOMP interop") {
@@ -185,9 +187,17 @@ class StompParallelTest extends StompTes
"login:admin\n" +
"passcode:password\n" +
"\n" +
- "Hello STOMP-UDP").toByteBuffer, target)
+ "Hello\u0000\n").toByteBuffer, target)
+ assert_received("Hello")
- assert_received("Hello STOMP-UDP")
+ channel.send(new AsciiBuffer(
+ "SEND\n" +
+ "destination:/topic/some-other-udp\n" +
+ "login:admin\n" +
+ "passcode:password\n" +
+ "\n" +
+ "World\u0000\n").toByteBuffer, target)
+ assert_received("World")
}
/**