You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/04/27 19:07:47 UTC
svn commit: r1097184 - in /activemq/activemq-apollo/trunk:
apollo-dto/src/main/java/org/apache/activemq/apollo/dto/
apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
Author: chirino
Date: Wed Apr 27 17:07:47 2011
New Revision: 1097184
URL: http://svn.apache.org/viewvc?rev=1097184&view=rev
Log:
Make the stomp max allowed headers sizes and friends configurable.
Modified:
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/StompDTO.java
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/StompDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/StompDTO.java?rev=1097184&r1=1097183&r2=1097184&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/StompDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/StompDTO.java Wed Apr 27 17:07:47 2011
@@ -38,5 +38,16 @@ public class StompDTO extends ProtocolDT
@XmlElement(name="add_user_header")
public List<AddUserHeaderDTO> add_user_headers = new ArrayList<AddUserHeaderDTO>();
+ @XmlAttribute(name="max_command_length")
+ public Integer max_command_length;
+
+ @XmlAttribute(name="max_header_length")
+ public Integer max_header_length;
+
+ @XmlAttribute(name="max_headers")
+ public Integer max_headers;
+
+ @XmlAttribute(name="max_data_length")
+ public Integer max_data_length;
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1097184&r1=1097183&r2=1097184&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Wed Apr 27 17:07:47 2011
@@ -33,13 +33,6 @@ import org.apache.activemq.apollo.util._
import org.apache.activemq.apollo.broker.store.{ZeroCopyBuffer, ZeroCopyBufferAllocator, MessageRecord}
object StompCodec extends Log {
- val READ_BUFFFER_SIZE = 1024*64;
- val MAX_COMMAND_LENGTH = 1024;
- val MAX_HEADER_LENGTH = 1024 * 10;
- val MAX_HEADERS = 1000;
- val MAX_DATA_LENGTH = 1024 * 1024 * 100;
- val SIZE_CHECK=false
-
def encode(message: StompFrameMessage):MessageRecord = {
val frame = message.frame
@@ -145,6 +138,11 @@ class StompCodec extends ProtocolCodec {
import StompCodec._
+ var max_command_length = 1024
+ var max_header_length = 1024*10
+ var max_headers = 1000
+ var max_data_length = 1024 * 1024 * 100
+
var zero_copy_buffer_allocator:ZeroCopyBufferAllocator = null
implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
@@ -316,7 +314,8 @@ class StompCodec extends ProtocolCodec {
def setReadableByteChannel(channel: ReadableByteChannel) = {
this.read_channel = channel
if( this.read_channel.isInstanceOf[SocketChannel] ) {
- this.read_channel.asInstanceOf[SocketChannel].socket().setReceiveBufferSize(read_buffer_size);
+ read_buffer_size = this.read_channel.asInstanceOf[SocketChannel].socket().getReceiveBufferSize
+ read_buffer = ByteBuffer.allocate(read_buffer_size)
}
}
@@ -391,7 +390,7 @@ class StompCodec extends ProtocolCodec {
return command
}
- def read_line(buffer:ByteBuffer, maxLength:Int, errorMessage:String):Buffer = {
+ def read_line(buffer:ByteBuffer, max:Int, errorMessage:String):Buffer = {
val read_limit = buffer.position
while( read_end < read_limit ) {
if( buffer.array()(read_end) =='\n') {
@@ -400,7 +399,7 @@ class StompCodec extends ProtocolCodec {
read_start = read_end
return rc
}
- if (SIZE_CHECK && read_end-read_start > maxLength) {
+ if (max != -1 && read_end-read_start > max) {
throw new IOException(errorMessage)
}
read_end += 1
@@ -409,7 +408,7 @@ class StompCodec extends ProtocolCodec {
}
def read_action:FrameReader = (buffer)=> {
- val line = read_line(buffer, MAX_COMMAND_LENGTH, "The maximum command length was exceeded")
+ val line = read_line(buffer, max_command_length, "The maximum command length was exceeded")
if( line !=null ) {
var action = line
if( trim ) {
@@ -424,11 +423,11 @@ class StompCodec extends ProtocolCodec {
def read_headers(action:AsciiBuffer, headers:HeaderMapBuffer=new HeaderMapBuffer()):FrameReader = (buffer)=> {
var rc:StompFrame = null
- val line = read_line(buffer, MAX_HEADER_LENGTH, "The maximum header length was exceeded")
+ val line = read_line(buffer, max_header_length, "The maximum header length was exceeded")
if( line !=null ) {
if( line.trim().length > 0 ) {
- if (SIZE_CHECK && headers.size > MAX_HEADERS) {
+ if (max_headers != -1 && headers.size > max_headers) {
throw new IOException("The maximum number of headers was exceeded")
}
@@ -464,7 +463,7 @@ class StompCodec extends ProtocolCodec {
throw new IOException("Specified content-length is not a valid integer")
}
- if (SIZE_CHECK && length > MAX_DATA_LENGTH) {
+ if (max_data_length != -1 && length > max_data_length) {
throw new IOException("The maximum data length was exceeded")
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1097184&r1=1097183&r2=1097184&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed Apr 27 17:07:47 2011
@@ -347,7 +347,16 @@ class StompProtocolHandler extends Proto
override def set_connection(connection: BrokerConnection) = {
super.set_connection(connection)
import collection.JavaConversions._
+
+ val codec = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
config = connection.connector.config.protocols.find( _.isInstanceOf[StompDTO]).map(_.asInstanceOf[StompDTO]).getOrElse(new StompDTO)
+
+ import OptionSupport._
+ config.max_command_length.foreach( codec.max_command_length = _ )
+ config.max_data_length.foreach( codec.max_data_length = _ )
+ config.max_header_length.foreach( codec.max_header_length = _ )
+ config.max_headers.foreach( codec.max_headers = _ )
+
}
override def create_connection_status = {