You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/17 03:25:28 UTC

svn commit: r1245297 - /activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala

Author: chirino
Date: Fri Feb 17 02:25:28 2012
New Revision: 1245297

URL: http://svn.apache.org/viewvc?rev=1245297&view=rev
Log:
Fix for APLO-160 Apollo becoming unresponsive when stressed with 48k connections.

This if releases the read and write buffers when they are not needed.

Modified:
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1245297&r1=1245296&r2=1245297&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Fri Feb 17 02:25:28 2012
@@ -19,19 +19,17 @@ package org.apache.activemq.apollo.stomp
 import _root_.org.apache.activemq.apollo.broker._
 
 import java.nio.ByteBuffer
-import collection.mutable.{ListBuffer, HashMap}
 import Stomp._
 
 import BufferConversions._
 import _root_.scala.collection.JavaConversions._
-import java.io.{EOFException, DataOutput, DataInput, IOException}
+import java.io.{EOFException, DataOutput, IOException}
 import java.nio.channels.{SocketChannel, WritableByteChannel, ReadableByteChannel}
 import org.fusesource.hawtdispatch.transport._
 import _root_.org.fusesource.hawtbuf._
 import Buffer._
 import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.broker.store.{DirectBuffer, DirectBufferAllocator, MessageRecord}
-import org.apache.activemq.apollo.util.Log._
 
 object StompCodec extends Log {
 
@@ -162,7 +160,8 @@ class StompCodec extends ProtocolCodec {
   var write_counter = 0L
   var write_channel:WritableByteChannel = null
 
-  var next_write_buffer = new DataByteArrayOutputStream(write_buffer_size)
+  var next_write_buffer_size = write_buffer_size
+  var next_write_buffer:DataByteArrayOutputStream = null
   var next_write_direct:DirectBuffer = null
 
   var write_buffer = ByteBuffer.allocate(0)
@@ -170,7 +169,7 @@ class StompCodec extends ProtocolCodec {
   var write_direct_pos = 0
   var last_write_io_size = 0
 
-  def full = next_write_direct!=null || next_write_buffer.size >= (write_buffer_size >> 1)
+  def full = next_write_direct!=null || ( next_write_buffer!=null && next_write_buffer.size >= (write_buffer_size >> 1))
   def is_empty = write_buffer.remaining == 0 && write_direct==null
 
   def setWritableByteChannel(channel: WritableByteChannel) = {
@@ -188,6 +187,9 @@ class StompCodec extends ProtocolCodec {
     if ( full) {
       ProtocolCodec.BufferState.FULL
     } else {
+      if(next_write_buffer==null) {
+        next_write_buffer = new DataByteArrayOutputStream(next_write_buffer_size)
+      }
       val was_empty = is_empty
       command match {
         case buffer:Buffer=>
@@ -276,15 +278,14 @@ class StompCodec extends ProtocolCodec {
             }
           }
         } else {
-          if( next_write_buffer.size()==0 ) {
+          if( next_write_buffer == null || next_write_buffer.size()==0 ) {
             return ProtocolCodec.BufferState.EMPTY
           } else {
             // size of next buffer is based on how much was used in the previous buffer.
-            val prev_size = (write_buffer.position()+512).max(512).min(write_buffer_size)
+            next_write_buffer_size = (write_buffer.position()+512).max(512).min(write_buffer_size)
             write_buffer = next_write_buffer.toBuffer().toByteBuffer()
             write_direct = next_write_direct
-
-            next_write_buffer = new DataByteArrayOutputStream(prev_size)
+            next_write_buffer = null
             next_write_direct = null
           }
         }
@@ -306,7 +307,7 @@ class StompCodec extends ProtocolCodec {
   var read_buffer_size = 1024*64
   var read_channel:ReadableByteChannel = null
 
-  var read_buffer = ByteBuffer.allocate(read_buffer_size)
+  var read_buffer:ByteBuffer = null
   var read_end = 0
   var read_start = 0
 
@@ -327,7 +328,8 @@ class StompCodec extends ProtocolCodec {
 
   def unread(buffer: Array[Byte]) = {
     assert(read_counter == 0)
-    read_buffer.put(buffer)
+    read_buffer = ByteBuffer.wrap(buffer)
+    read_buffer.position(read_buffer.capacity())
     read_counter += buffer.length
   }
 
@@ -336,7 +338,9 @@ class StompCodec extends ProtocolCodec {
   def getLastReadSize = last_read_io_size
 
   override def read():Object = {
-
+    if( read_buffer == null ) {
+      read_buffer = ByteBuffer.allocate(read_buffer_size)
+    }
     var command:Object = null
     while( command==null ) {
       // do we need to read in more data???
@@ -386,6 +390,11 @@ class StompCodec extends ProtocolCodec {
           if (last_read_io_size == -1) {
               throw new EOFException("Peer disconnected")
           } else if (last_read_io_size == 0) {
+              if( read_start == read_buffer.position() ) {
+                read_start = 0
+                read_end = 0
+                read_buffer = null
+              }
               return null
           }
           read_counter += last_read_io_size