You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/05/18 23:19:56 UTC

svn commit: r1124434 - /activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Author: chirino
Date: Wed May 18 21:19:55 2011
New Revision: 1124434

URL: http://svn.apache.org/viewvc?rev=1124434&view=rev
Log:
Fixes https://issues.apache.org/jira/browse/APLO-13 : Apollo does not encode colons in header also make sure we only encode headers in STOMP 1.1 and generate a more unique message id.

Modified:
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1124434&r1=1124433&r2=1124434&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed May 18 21:19:55 2011
@@ -25,7 +25,6 @@ import java.lang.String
 import protocol.{HeartBeatMonitor, ProtocolHandler}
 import security.SecurityContext
 import Stomp._
-import java.io.IOException
 import org.apache.activemq.apollo.selector.SelectorParser
 import org.apache.activemq.apollo.filter.{BooleanExpression, FilterException}
 import org.apache.activemq.apollo.broker.store._
@@ -37,6 +36,23 @@ import org.apache.activemq.apollo.dto._
 import org.apache.activemq.apollo.transport.tcp.SslTransport
 import java.security.cert.X509Certificate
 import collection.mutable.{ArrayBuffer, ListBuffer, HashMap}
+import java.io.{File, IOException}
+
+
+case class RichBuffer(self:Buffer) extends Proxy {
+  def + (rhs: Buffer) = {
+    val rc = new Buffer(self.length + rhs.length)
+    System.arraycopy(self.data, self.offset, rc.data, rc.offset, self.length)
+    System.arraycopy(rhs.data, rhs.offset, rc.data, rc.offset+self.length, rhs.length)
+    rc
+  }
+}
+
+object BufferSupport {
+  implicit def to_rich_buffer(value:Buffer):RichBuffer = RichBuffer(value)
+}
+
+import BufferSupport._
 
 object StompProtocolHandler extends Log {
 
@@ -89,16 +105,22 @@ class StompProtocolHandler extends Proto
   }
 
   def encode_header(value:String) = {
-    val data = value.getBytes("UTF-8")
-    var rc = new ByteArrayOutputStream(data.length)
-    data.foreach {
-      case ESCAPE  => rc.write(ESCAPE_ESCAPE_SEQ)
-      case COLON   => rc.write(COLON_ESCAPE_SEQ)
-      case NEWLINE => rc.write(COLON_ESCAPE_SEQ)
-      case c       => rc.write(c)
+    protocol_version match {
+      case null => utf8(value).ascii
+      case V1_0 => utf8(value).ascii
+      case _ =>
 
+        val data = value.getBytes("UTF-8")
+        var rc = new ByteArrayOutputStream(data.length)
+        data.foreach {
+          case ESCAPE  => rc.write(ESCAPE_ESCAPE_SEQ)
+          case COLON   => rc.write(COLON_ESCAPE_SEQ)
+          case NEWLINE => rc.write(COLON_ESCAPE_SEQ)
+          case c       => rc.write(c)
+
+        }
+        rc.toBuffer.ascii
     }
-    rc.toBuffer.ascii
   }
 
   protected def dispatchQueue:DispatchQueue = connection.dispatch_queue
@@ -338,13 +360,13 @@ class StompProtocolHandler extends Proto
   // uses by STOMP 1.0 clients
   var connection_ack_handlers = HashMap[AsciiBuffer, AckHandler]()
 
-  var session_id:AsciiBuffer = _
   var protocol_version:AsciiBuffer = _
 
   var heart_beat_monitor:HeartBeatMonitor = new HeartBeatMonitor
   val security_context = new SecurityContext
   var waiting_on:String = "client request"
   var config:StompDTO = _
+  var session_id:AsciiBuffer = _
 
   override def set_connection(connection: BrokerConnection) = {
     super.set_connection(connection)
@@ -730,7 +752,6 @@ class StompProtocolHandler extends Proto
     }
   }
 
-
   var message_id_counter = 0;
 
   def updated_headers(headers:HeaderMap) = {
@@ -738,9 +759,9 @@ class StompProtocolHandler extends Proto
 
     // Do we need to add the message id?
     if( get( headers, MESSAGE_ID) == None ) {
-      // TODO: properly generate mesage ids
       message_id_counter += 1
-      rc ::= (MESSAGE_ID, ascii("msg:"+message_id_counter))
+      val msgid: Buffer = session_id + encode_header(message_id_counter.toString())
+      rc ::= (MESSAGE_ID -> msgid.ascii)
     }
 
     // Do we need to add the user id?