You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/17 15:42:46 UTC

svn commit: r1245580 - /activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala

Author: chirino
Date: Fri Feb 17 14:42:46 2012
New Revision: 1245580

URL: http://svn.apache.org/viewvc?rev=1245580&view=rev
Log:
Revert "Fix for APLO-160 Apollo becoming unresponsive when stressed with 48k connections." This change had a negative performance impact, instead
we should look into using the strategy outlined in APLO-163 to deal with large connection counts.

Modified:
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1245580&r1=1245579&r2=1245580&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Fri Feb 17 14:42:46 2012
@@ -19,17 +19,19 @@ package org.apache.activemq.apollo.stomp
 import _root_.org.apache.activemq.apollo.broker._
 
 import java.nio.ByteBuffer
+import collection.mutable.{ListBuffer, HashMap}
 import Stomp._
 
 import BufferConversions._
 import _root_.scala.collection.JavaConversions._
-import java.io.{EOFException, DataOutput, IOException}
+import java.io.{EOFException, DataOutput, DataInput, IOException}
 import java.nio.channels.{SocketChannel, WritableByteChannel, ReadableByteChannel}
 import org.fusesource.hawtdispatch.transport._
 import _root_.org.fusesource.hawtbuf._
 import Buffer._
 import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.broker.store.{DirectBuffer, DirectBufferAllocator, MessageRecord}
+import org.apache.activemq.apollo.util.Log._
 
 object StompCodec extends Log {
 
@@ -160,8 +162,7 @@ class StompCodec extends ProtocolCodec {
   var write_counter = 0L
   var write_channel:WritableByteChannel = null
 
-  var next_write_buffer_size = write_buffer_size
-  var next_write_buffer:DataByteArrayOutputStream = null
+  var next_write_buffer = new DataByteArrayOutputStream(write_buffer_size)
   var next_write_direct:DirectBuffer = null
 
   var write_buffer = ByteBuffer.allocate(0)
@@ -169,7 +170,7 @@ class StompCodec extends ProtocolCodec {
   var write_direct_pos = 0
   var last_write_io_size = 0
 
-  def full = next_write_direct!=null || ( next_write_buffer!=null && next_write_buffer.size >= (write_buffer_size >> 1))
+  def full = next_write_direct!=null || next_write_buffer.size >= (write_buffer_size >> 1)
   def is_empty = write_buffer.remaining == 0 && write_direct==null
 
   def setWritableByteChannel(channel: WritableByteChannel) = {
@@ -187,9 +188,6 @@ class StompCodec extends ProtocolCodec {
     if ( full) {
       ProtocolCodec.BufferState.FULL
     } else {
-      if(next_write_buffer==null) {
-        next_write_buffer = new DataByteArrayOutputStream(next_write_buffer_size)
-      }
       val was_empty = is_empty
       command match {
         case buffer:Buffer=>
@@ -278,14 +276,15 @@ class StompCodec extends ProtocolCodec {
             }
           }
         } else {
-          if( next_write_buffer == null || next_write_buffer.size()==0 ) {
+          if( next_write_buffer.size()==0 ) {
             return ProtocolCodec.BufferState.EMPTY
           } else {
             // size of next buffer is based on how much was used in the previous buffer.
-            next_write_buffer_size = (write_buffer.position()+512).max(512).min(write_buffer_size)
+            val prev_size = (write_buffer.position()+512).max(512).min(write_buffer_size)
             write_buffer = next_write_buffer.toBuffer().toByteBuffer()
             write_direct = next_write_direct
-            next_write_buffer = null
+
+            next_write_buffer = new DataByteArrayOutputStream(prev_size)
             next_write_direct = null
           }
         }
@@ -307,7 +306,7 @@ class StompCodec extends ProtocolCodec {
   var read_buffer_size = 1024*64
   var read_channel:ReadableByteChannel = null
 
-  var read_buffer:ByteBuffer = null
+  var read_buffer = ByteBuffer.allocate(read_buffer_size)
   var read_end = 0
   var read_start = 0
 
@@ -328,8 +327,7 @@ class StompCodec extends ProtocolCodec {
 
   def unread(buffer: Array[Byte]) = {
     assert(read_counter == 0)
-    read_buffer = ByteBuffer.wrap(buffer)
-    read_buffer.position(read_buffer.capacity())
+    read_buffer.put(buffer)
     read_counter += buffer.length
   }
 
@@ -338,9 +336,7 @@ class StompCodec extends ProtocolCodec {
   def getLastReadSize = last_read_io_size
 
   override def read():Object = {
-    if( read_buffer == null ) {
-      read_buffer = ByteBuffer.allocate(read_buffer_size)
-    }
+
     var command:Object = null
     while( command==null ) {
       // do we need to read in more data???
@@ -390,11 +386,6 @@ class StompCodec extends ProtocolCodec {
           if (last_read_io_size == -1) {
               throw new EOFException("Peer disconnected")
           } else if (last_read_io_size == 0) {
-              if( read_start == read_buffer.position() ) {
-                read_start = 0
-                read_end = 0
-                read_buffer = null
-              }
               return null
           }
           read_counter += last_read_io_size