You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/08/29 00:17:45 UTC

svn commit: r1378366 - in /activemq/activemq-apollo/trunk/apollo-stomp/src: main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala

Author: chirino
Date: Tue Aug 28 22:17:44 2012
New Revision: 1378366

URL: http://svn.apache.org/viewvc?rev=1378366&view=rev
Log:
Fixes APLO-252: Spurious bytes added by the STOMP UDP Protocol

Modified:
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1378366&r1=1378365&r2=1378366&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Tue Aug 28 22:17:44 2012
@@ -88,10 +88,10 @@ object StompCodec extends Log {
   }
 
   def decode(message: MessageRecord):StompFrameMessage = {
-    new StompFrameMessage(decode_frame(message.buffer, message.direct_buffer))
+    new StompFrameMessage(decode_frame(message.buffer, message.direct_buffer, false))
   }
 
-  def decode_frame(buffer: Buffer, direct_buffer:DirectBuffer=null):StompFrame = {
+  def decode_frame(buffer: Buffer, direct_buffer:DirectBuffer=null, end_check:Boolean=true):StompFrame = {
     def read_line = {
       val pos = buffer.indexOf('\n'.toByte)
       if( pos<0 ) {
@@ -108,6 +108,7 @@ object StompCodec extends Log {
     val action = read_line
 
     val headers = new HeaderMapBuffer()
+    var contentLength:AsciiBuffer = null
 
     var line = read_line
     while( line.length() > 0 ) {
@@ -119,6 +120,9 @@ object StompCodec extends Log {
           var name = line.slice(0, seperatorIndex)
           var value = line.slice(seperatorIndex + 1, line.length)
           headers.add((name, value))
+          if (end_check && contentLength==null && name == CONTENT_LENGTH ) {
+            contentLength = value
+          }
       } catch {
           case e:Exception=>
             throw new IOException("Unable to parser header line [" + line + "]")
@@ -126,6 +130,27 @@ object StompCodec extends Log {
       line = read_line
     }
 
+    if ( end_check ) {
+      buffer.length = if (contentLength != null) {
+        val length = try {
+          contentLength.toString.toInt
+        } catch {
+          case e: NumberFormatException =>
+            throw new IOException("Specified content-length is not a valid integer")
+        }
+        if( length > buffer.length ) {
+          throw new IOException("Frame did not contain enough bytes to satisfy the content-length")
+        }
+        length
+      } else {
+        val pos = buffer.indexOf(0.toByte)
+        if( pos < 0 ) {
+          throw new IOException("Frame is not null terminated")
+        }
+        pos
+      }
+    }
+
     if( direct_buffer==null ) {
       new StompFrame(action, headers.toList, BufferContent(buffer))
     } else {
@@ -241,7 +266,7 @@ class StompCodec extends AbstractProtoco
               value = value.trim
             }
             var entry = (name.ascii, value.ascii)
-            if (entry._1 == CONTENT_LENGTH && contentLength==null) {
+            if (contentLength==null && entry._1 == CONTENT_LENGTH) {
               contentLength = entry._2
             }
             headers.add(entry)

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1378366&r1=1378365&r2=1378366&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala Tue Aug 28 22:17:44 2012
@@ -167,6 +167,8 @@ class StompParallelTest extends StompTes
     channel.send(new AsciiBuffer("Hello").toByteBuffer, target)
 
     assert_received("Hello")
+    channel.send(new AsciiBuffer("World").toByteBuffer, target)
+    assert_received("World")
   }
 
   test("STOMP UDP to STOMP interop") {
@@ -185,9 +187,17 @@ class StompParallelTest extends StompTes
       "login:admin\n" +
       "passcode:password\n" +
       "\n" +
-      "Hello STOMP-UDP").toByteBuffer, target)
+      "Hello\u0000\n").toByteBuffer, target)
+    assert_received("Hello")
 
-    assert_received("Hello STOMP-UDP")
+    channel.send(new AsciiBuffer(
+      "SEND\n" +
+      "destination:/topic/some-other-udp\n" +
+      "login:admin\n" +
+      "passcode:password\n" +
+      "\n" +
+      "World\u0000\n").toByteBuffer, target)
+    assert_received("World")
   }
 
   /**