You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/17 15:42:46 UTC
svn commit: r1245580 -
/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
Author: chirino
Date: Fri Feb 17 14:42:46 2012
New Revision: 1245580
URL: http://svn.apache.org/viewvc?rev=1245580&view=rev
Log:
Revert "Fix for APLO-160 Apollo becoming unresponsive when stressed with 48k connections." This change had a negative performance impact, instead
we should look into using the strategy outlined in APLO-163 to deal with large connection counts.
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1245580&r1=1245579&r2=1245580&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Fri Feb 17 14:42:46 2012
@@ -19,17 +19,19 @@ package org.apache.activemq.apollo.stomp
import _root_.org.apache.activemq.apollo.broker._
import java.nio.ByteBuffer
+import collection.mutable.{ListBuffer, HashMap}
import Stomp._
import BufferConversions._
import _root_.scala.collection.JavaConversions._
-import java.io.{EOFException, DataOutput, IOException}
+import java.io.{EOFException, DataOutput, DataInput, IOException}
import java.nio.channels.{SocketChannel, WritableByteChannel, ReadableByteChannel}
import org.fusesource.hawtdispatch.transport._
import _root_.org.fusesource.hawtbuf._
import Buffer._
import org.apache.activemq.apollo.util._
import org.apache.activemq.apollo.broker.store.{DirectBuffer, DirectBufferAllocator, MessageRecord}
+import org.apache.activemq.apollo.util.Log._
object StompCodec extends Log {
@@ -160,8 +162,7 @@ class StompCodec extends ProtocolCodec {
var write_counter = 0L
var write_channel:WritableByteChannel = null
- var next_write_buffer_size = write_buffer_size
- var next_write_buffer:DataByteArrayOutputStream = null
+ var next_write_buffer = new DataByteArrayOutputStream(write_buffer_size)
var next_write_direct:DirectBuffer = null
var write_buffer = ByteBuffer.allocate(0)
@@ -169,7 +170,7 @@ class StompCodec extends ProtocolCodec {
var write_direct_pos = 0
var last_write_io_size = 0
- def full = next_write_direct!=null || ( next_write_buffer!=null && next_write_buffer.size >= (write_buffer_size >> 1))
+ def full = next_write_direct!=null || next_write_buffer.size >= (write_buffer_size >> 1)
def is_empty = write_buffer.remaining == 0 && write_direct==null
def setWritableByteChannel(channel: WritableByteChannel) = {
@@ -187,9 +188,6 @@ class StompCodec extends ProtocolCodec {
if ( full) {
ProtocolCodec.BufferState.FULL
} else {
- if(next_write_buffer==null) {
- next_write_buffer = new DataByteArrayOutputStream(next_write_buffer_size)
- }
val was_empty = is_empty
command match {
case buffer:Buffer=>
@@ -278,14 +276,15 @@ class StompCodec extends ProtocolCodec {
}
}
} else {
- if( next_write_buffer == null || next_write_buffer.size()==0 ) {
+ if( next_write_buffer.size()==0 ) {
return ProtocolCodec.BufferState.EMPTY
} else {
// size of next buffer is based on how much was used in the previous buffer.
- next_write_buffer_size = (write_buffer.position()+512).max(512).min(write_buffer_size)
+ val prev_size = (write_buffer.position()+512).max(512).min(write_buffer_size)
write_buffer = next_write_buffer.toBuffer().toByteBuffer()
write_direct = next_write_direct
- next_write_buffer = null
+
+ next_write_buffer = new DataByteArrayOutputStream(prev_size)
next_write_direct = null
}
}
@@ -307,7 +306,7 @@ class StompCodec extends ProtocolCodec {
var read_buffer_size = 1024*64
var read_channel:ReadableByteChannel = null
- var read_buffer:ByteBuffer = null
+ var read_buffer = ByteBuffer.allocate(read_buffer_size)
var read_end = 0
var read_start = 0
@@ -328,8 +327,7 @@ class StompCodec extends ProtocolCodec {
def unread(buffer: Array[Byte]) = {
assert(read_counter == 0)
- read_buffer = ByteBuffer.wrap(buffer)
- read_buffer.position(read_buffer.capacity())
+ read_buffer.put(buffer)
read_counter += buffer.length
}
@@ -338,9 +336,7 @@ class StompCodec extends ProtocolCodec {
def getLastReadSize = last_read_io_size
override def read():Object = {
- if( read_buffer == null ) {
- read_buffer = ByteBuffer.allocate(read_buffer_size)
- }
+
var command:Object = null
while( command==null ) {
// do we need to read in more data???
@@ -390,11 +386,6 @@ class StompCodec extends ProtocolCodec {
if (last_read_io_size == -1) {
throw new EOFException("Peer disconnected")
} else if (last_read_io_size == 0) {
- if( read_start == read_buffer.position() ) {
- read_start = 0
- read_end = 0
- read_buffer = null
- }
return null
}
read_counter += last_read_io_size