You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/05/18 23:19:56 UTC
svn commit: r1124434 -
/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Author: chirino
Date: Wed May 18 21:19:55 2011
New Revision: 1124434
URL: http://svn.apache.org/viewvc?rev=1124434&view=rev
Log:
Fixes https://issues.apache.org/jira/browse/APLO-13 : Apollo does not encode colons in header also make sure we only encode headers in STOMP 1.1 and generate a more unique message id.
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1124434&r1=1124433&r2=1124434&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed May 18 21:19:55 2011
@@ -25,7 +25,6 @@ import java.lang.String
import protocol.{HeartBeatMonitor, ProtocolHandler}
import security.SecurityContext
import Stomp._
-import java.io.IOException
import org.apache.activemq.apollo.selector.SelectorParser
import org.apache.activemq.apollo.filter.{BooleanExpression, FilterException}
import org.apache.activemq.apollo.broker.store._
@@ -37,6 +36,23 @@ import org.apache.activemq.apollo.dto._
import org.apache.activemq.apollo.transport.tcp.SslTransport
import java.security.cert.X509Certificate
import collection.mutable.{ArrayBuffer, ListBuffer, HashMap}
+import java.io.{File, IOException}
+
+
+case class RichBuffer(self:Buffer) extends Proxy {
+ def + (rhs: Buffer) = {
+ val rc = new Buffer(self.length + rhs.length)
+ System.arraycopy(self.data, self.offset, rc.data, rc.offset, self.length)
+ System.arraycopy(rhs.data, rhs.offset, rc.data, rc.offset+self.length, rhs.length)
+ rc
+ }
+}
+
+object BufferSupport {
+ implicit def to_rich_buffer(value:Buffer):RichBuffer = RichBuffer(value)
+}
+
+import BufferSupport._
object StompProtocolHandler extends Log {
@@ -89,16 +105,22 @@ class StompProtocolHandler extends Proto
}
def encode_header(value:String) = {
- val data = value.getBytes("UTF-8")
- var rc = new ByteArrayOutputStream(data.length)
- data.foreach {
- case ESCAPE => rc.write(ESCAPE_ESCAPE_SEQ)
- case COLON => rc.write(COLON_ESCAPE_SEQ)
- case NEWLINE => rc.write(COLON_ESCAPE_SEQ)
- case c => rc.write(c)
+ protocol_version match {
+ case null => utf8(value).ascii
+ case V1_0 => utf8(value).ascii
+ case _ =>
+ val data = value.getBytes("UTF-8")
+ var rc = new ByteArrayOutputStream(data.length)
+ data.foreach {
+ case ESCAPE => rc.write(ESCAPE_ESCAPE_SEQ)
+ case COLON => rc.write(COLON_ESCAPE_SEQ)
+ case NEWLINE => rc.write(COLON_ESCAPE_SEQ)
+ case c => rc.write(c)
+
+ }
+ rc.toBuffer.ascii
}
- rc.toBuffer.ascii
}
protected def dispatchQueue:DispatchQueue = connection.dispatch_queue
@@ -338,13 +360,13 @@ class StompProtocolHandler extends Proto
// uses by STOMP 1.0 clients
var connection_ack_handlers = HashMap[AsciiBuffer, AckHandler]()
- var session_id:AsciiBuffer = _
var protocol_version:AsciiBuffer = _
var heart_beat_monitor:HeartBeatMonitor = new HeartBeatMonitor
val security_context = new SecurityContext
var waiting_on:String = "client request"
var config:StompDTO = _
+ var session_id:AsciiBuffer = _
override def set_connection(connection: BrokerConnection) = {
super.set_connection(connection)
@@ -730,7 +752,6 @@ class StompProtocolHandler extends Proto
}
}
-
var message_id_counter = 0;
def updated_headers(headers:HeaderMap) = {
@@ -738,9 +759,9 @@ class StompProtocolHandler extends Proto
// Do we need to add the message id?
if( get( headers, MESSAGE_ID) == None ) {
- // TODO: properly generate mesage ids
message_id_counter += 1
- rc ::= (MESSAGE_ID, ascii("msg:"+message_id_counter))
+ val msgid: Buffer = session_id + encode_header(message_id_counter.toString())
+ rc ::= (MESSAGE_ID -> msgid.ascii)
}
// Do we need to add the user id?