You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/09/11 18:34:28 UTC

svn commit: r1383472 - in /activemq/activemq-apollo/trunk/apollo-stomp/src: main/scala/org/apache/activemq/apollo/stomp/ test/scala/org/apache/activemq/apollo/stomp/test/

Author: chirino
Date: Tue Sep 11 16:34:27 2012
New Revision: 1383472

URL: http://svn.apache.org/viewvc?rev=1383472&view=rev
Log:
Fixes APLO-259: Support Telnet clients that send '\r\n' character to terminate a line.

Also integrates and expands the test case in the patch submitted by Christian Posta in APLO-255

Modified:
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1383472&r1=1383471&r2=1383472&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Tue Sep 11 16:34:27 2012
@@ -166,6 +166,7 @@ class StompCodec extends AbstractProtoco
   var max_headers: Int = 1000
   var max_data_length: Int = 1024 * 1024 * 100
   var trim = true
+  var trim_cr = false
 
   protected def encode(command: AnyRef) = command match {
     case buffer:Buffer=> buffer.writeTo(nextWriteBuffer.asInstanceOf[DataOutput])
@@ -230,6 +231,8 @@ class StompCodec extends AbstractProtoco
         var action = line.moveTail(-1)
         if (trim) {
           action = action.trim
+        } else if( trim_cr && action.length > 0 && action.get(action.length-1)=='\r'.toByte ) {
+          action.moveTail(-1)
         }
         if (action.length > 0) {
           nextDecodeAction = read_headers(action.ascii)
@@ -247,7 +250,14 @@ class StompCodec extends AbstractProtoco
     def apply: AnyRef = {
       var line = readUntil(NEWLINE, max_header_length, "The maximum header length was exceeded")
       if (line != null) {
-        line = line.moveTail(-1)
+
+        // Strip off the \n
+        line.moveTail(-1)
+        // 1.0 and 1.2 spec trims off the \r
+        if ( (trim || trim_cr) && line.length > 0 && line.get(line.length-1)=='\r'.toByte ) {
+          line.moveTail(-1)
+        }
+
         if (line.length > 0) {
           if (max_headers != -1 && headers.size > max_headers) {
             throw new IOException("The maximum number of headers was exceeded")

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1383472&r1=1383471&r2=1383472&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Tue Sep 11 16:34:27 2012
@@ -326,11 +326,14 @@ object Stomp {
   val END_OF_FRAME_BUFFER = new Buffer(Array(NULL, NEWLINE))
   val COLON: Byte = ':'
   val COLON_BUFFER = new Buffer(Array(COLON))
+  val CR: Byte = '\r'
+  val CR_BUFFER = new Buffer(Array(CR))
   val ESCAPE:Byte = '\\'
 
   val ESCAPE_ESCAPE_SEQ = ascii("""\\""")
   val COLON_ESCAPE_SEQ = ascii("""\c""")
   val NEWLINE_ESCAPE_SEQ = ascii("""\n""")
+  val CR_ESCAPE_SEQ = ascii("""\r""")
 
 
   ///////////////////////////////////////////////////////////////////
@@ -432,11 +435,12 @@ object Stomp {
 
   val V1_0 = ascii("1.0")
   val V1_1 = ascii("1.1")
+  val V1_2 = ascii("1.2")
   val DEFAULT_HEART_BEAT = ascii("0,0")
 
   val EMPTY = EMPTY_BUFFER.ascii()
 
-  val SUPPORTED_PROTOCOL_VERSIONS = List(V1_1, V1_0)
+  val SUPPORTED_PROTOCOL_VERSIONS = List(V1_2, V1_1, V1_0)
 
   val TEXT_PLAIN = ascii("text/plain")
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1383472&r1=1383471&r2=1383472&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Tue Sep 11 16:34:27 2012
@@ -104,6 +104,9 @@ object StompProtocolHandler extends Log 
       } else if( pos.startsWith(NEWLINE_ESCAPE_SEQ) ) {
         rc.write(NEWLINE)
         pos.moveHead(2)
+      } else if( pos.startsWith(CR_ESCAPE_SEQ) ) {
+        rc.write(CR)
+        pos.moveHead(2)
       } else {
         rc.write(pos.data(pos.offset))
         pos.moveHead(1)
@@ -116,6 +119,17 @@ object StompProtocolHandler extends Log 
     protocol_version match {
       case null => utf8(value).ascii
       case V1_0 => utf8(value).ascii
+      case V1_1 =>
+        val data = value.getBytes("UTF-8")
+        var rc = new ByteArrayOutputStream(data.length)
+        data.foreach {
+          case ESCAPE  => rc.write(ESCAPE_ESCAPE_SEQ)
+          case COLON   => rc.write(COLON_ESCAPE_SEQ)
+          case NEWLINE => rc.write(COLON_ESCAPE_SEQ)
+          case c       => rc.write(c)
+        }
+        rc.toBuffer.ascii
+
       case _ =>
 
         val data = value.getBytes("UTF-8")
@@ -124,8 +138,8 @@ object StompProtocolHandler extends Log 
           case ESCAPE  => rc.write(ESCAPE_ESCAPE_SEQ)
           case COLON   => rc.write(COLON_ESCAPE_SEQ)
           case NEWLINE => rc.write(COLON_ESCAPE_SEQ)
+          case CR      => rc.write(CR_ESCAPE_SEQ)
           case c       => rc.write(c)
-
         }
         rc.toBuffer.ascii
     }
@@ -946,6 +960,10 @@ class StompProtocolHandler extends Proto
     if( protocol_version != V1_0 ) {
       // disable trimming...
       codec.trim = false
+      if( protocol_version != V1_1 ) {
+        // enable \r triming
+        codec.trim_cr = true
+      }
     }
 
     val heart_beat = get(headers, HEART_BEAT).getOrElse(DEFAULT_HEART_BEAT)

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1383472&r1=1383471&r2=1383472&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala Tue Sep 11 16:34:27 2012
@@ -1464,5 +1464,33 @@ class StompParallelTest extends StompTes
     Thread.sleep(1000)
   }
 
+  test("Sending as a Telnet client"){
+    client.open("localhost", port)
+
+    client.out.write("CONNECT\r\n".getBytes)
+    client.out.write("accept-version:1.2\r\n".getBytes)
+    client.out.write("login:admin\r\n".getBytes)
+    client.out.write("passcode:password\r\n".getBytes)
+    client.out.write("\r\n".getBytes)
+    client.out.write("\u0000\r\n".getBytes)
+    client.out.flush
+
+    val frame = client.receive()
+    frame should startWith("CONNECTED\n")
+    frame should include regex ("""session:.+?\n""")
+
+    client.out.write((
+      "SUBSCRIBE\r\n" +
+      "id:0\r\n" +
+      "destination:/queue/somedest\r\n" +
+      "receipt:0\r\n" +
+      "\r\n"+
+      "\u0000\r\n"
+    ).getBytes)
+
+    client.out.flush
+    wait_for_receipt("0")
+
+  }
 
 }