You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/04/27 19:07:47 UTC

svn commit: r1097184 - in /activemq/activemq-apollo/trunk: apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/

Author: chirino
Date: Wed Apr 27 17:07:47 2011
New Revision: 1097184

URL: http://svn.apache.org/viewvc?rev=1097184&view=rev
Log:
Make the stomp max allowed headers sizes and friends configurable.

Modified:
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/StompDTO.java
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/StompDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/StompDTO.java?rev=1097184&r1=1097183&r2=1097184&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/StompDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/StompDTO.java Wed Apr 27 17:07:47 2011
@@ -38,5 +38,16 @@ public class StompDTO extends ProtocolDT
     @XmlElement(name="add_user_header")
     public List<AddUserHeaderDTO> add_user_headers = new ArrayList<AddUserHeaderDTO>();
 
+    @XmlAttribute(name="max_command_length")
+    public Integer max_command_length;
+
+    @XmlAttribute(name="max_header_length")
+    public Integer max_header_length;
+
+    @XmlAttribute(name="max_headers")
+    public Integer max_headers;
+
+    @XmlAttribute(name="max_data_length")
+    public Integer max_data_length;
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1097184&r1=1097183&r2=1097184&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Wed Apr 27 17:07:47 2011
@@ -33,13 +33,6 @@ import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.broker.store.{ZeroCopyBuffer, ZeroCopyBufferAllocator, MessageRecord}
 
 object StompCodec extends Log {
-    val READ_BUFFFER_SIZE = 1024*64;
-    val MAX_COMMAND_LENGTH = 1024;
-    val MAX_HEADER_LENGTH = 1024 * 10;
-    val MAX_HEADERS = 1000;
-    val MAX_DATA_LENGTH = 1024 * 1024 * 100;
-    val SIZE_CHECK=false
-
 
   def encode(message: StompFrameMessage):MessageRecord = {
     val frame = message.frame
@@ -145,6 +138,11 @@ class StompCodec extends ProtocolCodec {
 
   import StompCodec._
 
+  var max_command_length = 1024
+  var max_header_length = 1024*10
+  var max_headers = 1000
+  var max_data_length = 1024 * 1024 * 100
+
   var zero_copy_buffer_allocator:ZeroCopyBufferAllocator = null
 
   implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
@@ -316,7 +314,8 @@ class StompCodec extends ProtocolCodec {
   def setReadableByteChannel(channel: ReadableByteChannel) = {
     this.read_channel = channel
     if( this.read_channel.isInstanceOf[SocketChannel] ) {
-      this.read_channel.asInstanceOf[SocketChannel].socket().setReceiveBufferSize(read_buffer_size);
+      read_buffer_size = this.read_channel.asInstanceOf[SocketChannel].socket().getReceiveBufferSize
+      read_buffer = ByteBuffer.allocate(read_buffer_size)
     }
   }
 
@@ -391,7 +390,7 @@ class StompCodec extends ProtocolCodec {
     return command
   }
 
-  def read_line(buffer:ByteBuffer, maxLength:Int, errorMessage:String):Buffer = {
+  def read_line(buffer:ByteBuffer, max:Int, errorMessage:String):Buffer = {
       val read_limit = buffer.position
       while( read_end < read_limit ) {
         if( buffer.array()(read_end) =='\n') {
@@ -400,7 +399,7 @@ class StompCodec extends ProtocolCodec {
           read_start = read_end
           return rc
         }
-        if (SIZE_CHECK && read_end-read_start > maxLength) {
+        if (max != -1 && read_end-read_start > max) {
             throw new IOException(errorMessage)
         }
         read_end += 1
@@ -409,7 +408,7 @@ class StompCodec extends ProtocolCodec {
   }
 
   def read_action:FrameReader = (buffer)=> {
-    val line = read_line(buffer, MAX_COMMAND_LENGTH, "The maximum command length was exceeded")
+    val line = read_line(buffer, max_command_length, "The maximum command length was exceeded")
     if( line !=null ) {
       var action = line
       if( trim ) {
@@ -424,11 +423,11 @@ class StompCodec extends ProtocolCodec {
 
   def read_headers(action:AsciiBuffer, headers:HeaderMapBuffer=new HeaderMapBuffer()):FrameReader = (buffer)=> {
     var rc:StompFrame = null
-    val line = read_line(buffer, MAX_HEADER_LENGTH, "The maximum header length was exceeded")
+    val line = read_line(buffer, max_header_length, "The maximum header length was exceeded")
     if( line !=null ) {
       if( line.trim().length > 0 ) {
 
-        if (SIZE_CHECK && headers.size > MAX_HEADERS) {
+        if (max_headers != -1 && headers.size > max_headers) {
             throw new IOException("The maximum number of headers was exceeded")
         }
 
@@ -464,7 +463,7 @@ class StompCodec extends ProtocolCodec {
               throw new IOException("Specified content-length is not a valid integer")
           }
 
-          if (SIZE_CHECK && length > MAX_DATA_LENGTH) {
+          if (max_data_length != -1 && length > max_data_length) {
               throw new IOException("The maximum data length was exceeded")
           }
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1097184&r1=1097183&r2=1097184&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed Apr 27 17:07:47 2011
@@ -347,7 +347,16 @@ class StompProtocolHandler extends Proto
   override def set_connection(connection: BrokerConnection) = {
     super.set_connection(connection)
     import collection.JavaConversions._
+
+    val codec = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
     config = connection.connector.config.protocols.find( _.isInstanceOf[StompDTO]).map(_.asInstanceOf[StompDTO]).getOrElse(new StompDTO)
+
+    import OptionSupport._
+    config.max_command_length.foreach( codec.max_command_length = _ )
+    config.max_data_length.foreach( codec.max_data_length = _ )
+    config.max_header_length.foreach( codec.max_header_length = _ )
+    config.max_headers.foreach( codec.max_headers = _ )
+
   }
 
   override def create_connection_status = {