You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/09/11 18:34:28 UTC
svn commit: r1383472 - in /activemq/activemq-apollo/trunk/apollo-stomp/src:
main/scala/org/apache/activemq/apollo/stomp/
test/scala/org/apache/activemq/apollo/stomp/test/
Author: chirino
Date: Tue Sep 11 16:34:27 2012
New Revision: 1383472
URL: http://svn.apache.org/viewvc?rev=1383472&view=rev
Log:
Fixes APLO-259: Support Telnet clients that send '\r\n' character to terminate a line.
Also integrates and expands the test case in the patch submitted by Christian Posta in APLO-255
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1383472&r1=1383471&r2=1383472&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Tue Sep 11 16:34:27 2012
@@ -166,6 +166,7 @@ class StompCodec extends AbstractProtoco
var max_headers: Int = 1000
var max_data_length: Int = 1024 * 1024 * 100
var trim = true
+ var trim_cr = false
protected def encode(command: AnyRef) = command match {
case buffer:Buffer=> buffer.writeTo(nextWriteBuffer.asInstanceOf[DataOutput])
@@ -230,6 +231,8 @@ class StompCodec extends AbstractProtoco
var action = line.moveTail(-1)
if (trim) {
action = action.trim
+ } else if( trim_cr && action.length > 0 && action.get(action.length-1)=='\r'.toByte ) {
+ action.moveTail(-1)
}
if (action.length > 0) {
nextDecodeAction = read_headers(action.ascii)
@@ -247,7 +250,14 @@ class StompCodec extends AbstractProtoco
def apply: AnyRef = {
var line = readUntil(NEWLINE, max_header_length, "The maximum header length was exceeded")
if (line != null) {
- line = line.moveTail(-1)
+
+ // Strip off the \n
+ line.moveTail(-1)
+ // 1.0 and 1.2 spec trims off the \r
+ if ( (trim || trim_cr) && line.length > 0 && line.get(line.length-1)=='\r'.toByte ) {
+ line.moveTail(-1)
+ }
+
if (line.length > 0) {
if (max_headers != -1 && headers.size > max_headers) {
throw new IOException("The maximum number of headers was exceeded")
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1383472&r1=1383471&r2=1383472&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Tue Sep 11 16:34:27 2012
@@ -326,11 +326,14 @@ object Stomp {
val END_OF_FRAME_BUFFER = new Buffer(Array(NULL, NEWLINE))
val COLON: Byte = ':'
val COLON_BUFFER = new Buffer(Array(COLON))
+ val CR: Byte = '\r'
+ val CR_BUFFER = new Buffer(Array(CR))
val ESCAPE:Byte = '\\'
val ESCAPE_ESCAPE_SEQ = ascii("""\\""")
val COLON_ESCAPE_SEQ = ascii("""\c""")
val NEWLINE_ESCAPE_SEQ = ascii("""\n""")
+ val CR_ESCAPE_SEQ = ascii("""\r""")
///////////////////////////////////////////////////////////////////
@@ -432,11 +435,12 @@ object Stomp {
val V1_0 = ascii("1.0")
val V1_1 = ascii("1.1")
+ val V1_2 = ascii("1.2")
val DEFAULT_HEART_BEAT = ascii("0,0")
val EMPTY = EMPTY_BUFFER.ascii()
- val SUPPORTED_PROTOCOL_VERSIONS = List(V1_1, V1_0)
+ val SUPPORTED_PROTOCOL_VERSIONS = List(V1_2, V1_1, V1_0)
val TEXT_PLAIN = ascii("text/plain")
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1383472&r1=1383471&r2=1383472&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Tue Sep 11 16:34:27 2012
@@ -104,6 +104,9 @@ object StompProtocolHandler extends Log
} else if( pos.startsWith(NEWLINE_ESCAPE_SEQ) ) {
rc.write(NEWLINE)
pos.moveHead(2)
+ } else if( pos.startsWith(CR_ESCAPE_SEQ) ) {
+ rc.write(CR)
+ pos.moveHead(2)
} else {
rc.write(pos.data(pos.offset))
pos.moveHead(1)
@@ -116,6 +119,17 @@ object StompProtocolHandler extends Log
protocol_version match {
case null => utf8(value).ascii
case V1_0 => utf8(value).ascii
+ case V1_1 =>
+ val data = value.getBytes("UTF-8")
+ var rc = new ByteArrayOutputStream(data.length)
+ data.foreach {
+ case ESCAPE => rc.write(ESCAPE_ESCAPE_SEQ)
+ case COLON => rc.write(COLON_ESCAPE_SEQ)
+ case NEWLINE => rc.write(COLON_ESCAPE_SEQ)
+ case c => rc.write(c)
+ }
+ rc.toBuffer.ascii
+
case _ =>
val data = value.getBytes("UTF-8")
@@ -124,8 +138,8 @@ object StompProtocolHandler extends Log
case ESCAPE => rc.write(ESCAPE_ESCAPE_SEQ)
case COLON => rc.write(COLON_ESCAPE_SEQ)
case NEWLINE => rc.write(COLON_ESCAPE_SEQ)
+ case CR => rc.write(CR_ESCAPE_SEQ)
case c => rc.write(c)
-
}
rc.toBuffer.ascii
}
@@ -946,6 +960,10 @@ class StompProtocolHandler extends Proto
if( protocol_version != V1_0 ) {
// disable trimming...
codec.trim = false
+ if( protocol_version != V1_1 ) {
+ // enable \r triming
+ codec.trim_cr = true
+ }
}
val heart_beat = get(headers, HEART_BEAT).getOrElse(DEFAULT_HEART_BEAT)
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1383472&r1=1383471&r2=1383472&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala Tue Sep 11 16:34:27 2012
@@ -1464,5 +1464,33 @@ class StompParallelTest extends StompTes
Thread.sleep(1000)
}
+ test("Sending as a Telnet client"){
+ client.open("localhost", port)
+
+ client.out.write("CONNECT\r\n".getBytes)
+ client.out.write("accept-version:1.2\r\n".getBytes)
+ client.out.write("login:admin\r\n".getBytes)
+ client.out.write("passcode:password\r\n".getBytes)
+ client.out.write("\r\n".getBytes)
+ client.out.write("\u0000\r\n".getBytes)
+ client.out.flush
+
+ val frame = client.receive()
+ frame should startWith("CONNECTED\n")
+ frame should include regex ("""session:.+?\n""")
+
+ client.out.write((
+ "SUBSCRIBE\r\n" +
+ "id:0\r\n" +
+ "destination:/queue/somedest\r\n" +
+ "receipt:0\r\n" +
+ "\r\n"+
+ "\u0000\r\n"
+ ).getBytes)
+
+ client.out.flush
+ wait_for_receipt("0")
+
+ }
}