You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:22:01 UTC

svn commit: r961215 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/ activemq-hawtdb/src/main/resources/META...

Author: chirino
Date: Wed Jul  7 04:22:00 2010
New Revision: 961215

URL: http://svn.apache.org/viewvc?rev=961215&view=rev
Log:
Hooked in the MemoryPool feature into the stomp protocol.  It can now handle HUGE messages without a problem, it does not even affect the JVM memory usage since they will get received and transmitted from a memory mapped file.

The store interfaces need to get hooked into the MemoryPool so that they do not bring in those huge messages into the JVM's memory space when they are working on persisting the message.

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/memory-pools
      - copied, changed from r961214, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org/apache/activemq/apollo/memory-pools
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org/apache/activemq/apollo/memory-pools
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPoolSPI.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
    activemq/sandbox/activemq-apollo-actor/activemq-web/readme.md

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961215&r1=961214&r2=961215&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul  7 04:22:00 2010
@@ -74,7 +74,7 @@ trait DeliverySession extends Sink[Deliv
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait Message extends Filterable {
+trait Message extends Filterable with Retained {
 
   /**
    * the globally unique id of the message
@@ -132,7 +132,7 @@ object Delivery extends Sizer[Delivery] 
 class Delivery extends BaseRetained {
 
   /**
-   * memory size of the delivery.  Used for resource allocation tracking
+   * Total size of the delivery.  Used for resource allocation tracking
    */
   var size:Int = 0
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961215&r1=961214&r2=961215&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:22:00 2010
@@ -493,7 +493,7 @@ class Queue(val host: VirtualHost, val d
       if (session.full) {
         false
       } else {
-
+        delivery.message.retain
         if( tune_persistent && delivery.uow!=null ) {
           delivery.uow.retain
         }
@@ -755,7 +755,7 @@ class QueueEntry(val queue:Queue, val se
     def as_head:Head = null
 
     /**
-     * Gets the size of this entry in bytes.  The head and tail entries allways return 0.
+     * Gets the size of this entry in bytes.  The head and tail entries always return 0.
      */
     def size = 0
 
@@ -1000,8 +1000,9 @@ class QueueEntry(val queue:Queue, val se
       if( flushing ) {
         queue.flushing_size-=size
         queue.capacity_used -= size
-        state = new Flushed(delivery.storeKey, size)
+        delivery.message.release
 
+        state = new Flushed(delivery.storeKey, size)
         if( can_combine_with_prev ) {
           getPrevious.as_flushed_range.combineNext
         }
@@ -1020,6 +1021,7 @@ class QueueEntry(val queue:Queue, val se
         flushing = false
         queue.flushing_size-=size
       }
+      delivery.message.release
       queue.capacity_used -= size
       super.remove
     }
@@ -1169,7 +1171,7 @@ class QueueEntry(val queue:Queue, val se
         delivery.size = messageRecord.size
         delivery.storeKey = messageRecord.key
 
-        queue.capacity_used += size
+        queue.capacity_used += delivery.size
         queue.flushed_items -= 1
         state = new Loaded(delivery, true)
       } else {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=961215&r1=961214&r2=961215&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul  7 04:22:00 2010
@@ -302,18 +302,20 @@ class DeliveryProducerRoute(val router:R
 
       // Do we need to store the message if we have a matching consumer?
       var storeOnMatch = delivery.message.persistent && router.host.store!=null
+      delivery.message.retain
 
       targets.foreach { target=>
 
         // only delivery to matching consumers
         if( target.consumer.matches(delivery) ) {
-          
+
           if( storeOnMatch ) {
             delivery.uow = router.host.store.createStoreUOW
             delivery.storeKey = delivery.uow.store(delivery.createMessageRecord)
             storeOnMatch = false
           }
 
+
           if( !target.offer(delivery) ) {
             overflowSessions ::= target
           }
@@ -340,6 +342,7 @@ class DeliveryProducerRoute(val router:R
     if (delivery.uow != null) {
       delivery.uow.release
     }
+    delivery.message.release
   }
 
   val drainer = ^{

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961215&r1=961214&r2=961215&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 04:22:00 2010
@@ -34,7 +34,7 @@ import org.apache.activemq.apollo.dto.{H
 import java.io.File
 import java.util.concurrent.TimeUnit
 import org.apache.activemq.apollo.util.LongCounter
-import org.apache.activemq.apollo.MemoryPool
+import org.apache.activemq.apollo.{MemoryPoolFactory, MemoryPool}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -127,6 +127,17 @@ class VirtualHost(val broker: Broker, va
 
 
   override protected def _start(onCompleted:Runnable):Unit = {
+
+//    val memory_pool_config: String = null
+    val memory_pool_config: String = "hawtdb:activemq.tmp"
+
+    if( MemoryPoolFactory.validate(memory_pool_config) ) {
+      memory_pool = MemoryPoolFactory.create(memory_pool_config)
+      if( memory_pool!=null ) {
+        memory_pool.start
+      }
+    }
+
     val tracker = new LoggingTracker("virtual host startup", dispatchQueue)
     store = StoreFactory.create(config.store)
     if( store!=null ) {
@@ -208,6 +219,11 @@ class VirtualHost(val broker: Broker, va
 //        }
 //        done.await();
 
+    if( memory_pool!=null ) {
+      memory_pool.stop
+      memory_pool = null
+    }
+
     if( store!=null ) {
       store.stop(onCompleted);
     } else {

Copied: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/memory-pools (from r961214, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org/apache/activemq/apollo/memory-pools)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/memory-pools?p2=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/memory-pools&p1=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org/apache/activemq/apollo/memory-pools&r1=961214&r2=961215&rev=961215&view=diff
==============================================================================
    (empty)

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPoolSPI.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPoolSPI.scala?rev=961215&r1=961214&r2=961215&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPoolSPI.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPoolSPI.scala Wed Jul  7 04:22:00 2010
@@ -39,7 +39,6 @@ class HawtDBMemoryPoolSPI extends Memory
     if( config.startsWith(prefix) ) {
       val file = new File(config.substring(prefix.length))
       new HawtDBMemoryPool(file)
-      null
     } else {
       null
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=961215&r1=961214&r2=961215&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jul  7 04:22:00 2010
@@ -24,6 +24,7 @@ import java.lang.{String, Class}
 import java.io.DataOutput
 import org.apache.activemq.apollo.broker._
 import org.apache.activemq.apollo.MemoryAllocation
+import org.fusesource.hawtdispatch.BaseRetained
 
 /**
  *
@@ -153,12 +154,17 @@ case class StompFrameMessage(frame:Stomp
     }
   }
 
+
+  def setDisposer(disposer: Runnable) = throw new UnsupportedOperationException
+  def retained = throw new UnsupportedOperationException
+  def retain = frame.retain
+  def release = frame.release
 }
 
 
 
 object StompFrame extends Sizer[StompFrame] {
-  def size(value:StompFrame) = value.size   
+  def size(value:StompFrame) = value.size
 }
 
 trait StompContent {
@@ -170,6 +176,8 @@ trait StompContent {
 
   def utf8:UTF8Buffer
 
+  def retain = {}
+  def release = {}
 }
 
 object NilStompContent extends StompContent {
@@ -208,6 +216,9 @@ case class DirectStompContent(direct:Mem
   def utf8:UTF8Buffer = {
     buffer.utf8
   }
+
+  override def retain = direct.retain
+  override def release = direct.release
 }
 
 /**
@@ -272,4 +283,6 @@ case class StompFrame(action:AsciiBuffer
     ).map(_._2).getOrElse(null)
   }
 
+  def retain = content.retain
+  def release = content.release
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961215&r1=961214&r2=961215&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 04:22:00 2010
@@ -138,6 +138,7 @@ class StompProtocolHandler extends Proto
             }
           }
           val frame = delivery.message.asInstanceOf[StompFrameMessage].frame
+          frame.retain
           val rc = session.offer(frame)
           assert(rc, "offer should be accepted since it was not full")
           true
@@ -163,7 +164,8 @@ class StompProtocolHandler extends Proto
   var pendingAcks = HashMap[AsciiBuffer, (StoreUOW)=>Unit]()
 
   override def onTransportConnected() = {
-    session_manager = new SinkMux[StompFrame]( MapSink(connection.transportSink){ x=>x }, dispatchQueue, StompFrame)
+
+    session_manager = new SinkMux[StompFrame]( MapSink(connection.transportSink){x=>x}, dispatchQueue, StompFrame)
     connection_sink = new OverflowSink(session_manager.open(dispatchQueue));
     connection_sink.refiller = ^{}
     
@@ -275,6 +277,7 @@ class StompProtocolHandler extends Proto
         }
 
       case None=>
+        frame.release
         die("destination not set.")
     }
   }
@@ -328,8 +331,7 @@ class StompProtocolHandler extends Proto
         connection_sink.offer(StompFrame(Responses.RECEIPT, List((Stomp.Headers.Response.RECEIPT_ID, receipt))))
       }
     }
-
-
+    frame.release
   }
 
   def on_stomp_subscribe(headers:HeaderMap) = {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala?rev=961215&r1=961214&r2=961215&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala Wed Jul  7 04:22:00 2010
@@ -152,9 +152,11 @@ class StompWireFormat extends WireFormat
 
   var next_write_buffer = new DataByteArrayOutputStream(write_buffer_size)
   var next_write_direct:ByteBuffer = null
+  var next_write_direct_frame:StompFrame = null
 
   var write_buffer = ByteBuffer.allocate(0)
   var write_direct:ByteBuffer = null
+  var write_direct_frame:StompFrame = null
 
   def is_full = next_write_direct!=null || next_write_buffer.size() >= (write_buffer_size >> 2)
   def is_empty = write_buffer.remaining() == 0 && write_direct==null
@@ -205,6 +207,7 @@ class StompWireFormat extends WireFormat
       val buffer2 = frame.content.asInstanceOf[BufferStompContent].content;
       val length = (buffer2.offset-buffer1.offset)+buffer2.length
       os.write( buffer1.data, offset, length)
+      END_OF_FRAME_BUFFER.writeTo(os)
 
     } else {
       for( (key, value) <- frame.headers ) {
@@ -218,14 +221,15 @@ class StompWireFormat extends WireFormat
       frame.content match {
         case x:DirectStompContent=>
           next_write_direct = x.direct.buffer.duplicate
-          next_write_direct.limit(next_write_direct.limit-1)
+          next_write_direct.clear
+          next_write_direct_frame = frame
         case x:BufferStompContent=>
           x.content.writeTo(os)
+          END_OF_FRAME_BUFFER.writeTo(os)
         case _=>
+          END_OF_FRAME_BUFFER.writeTo(os)
       }
-
     }
-    END_OF_FRAME_BUFFER.writeTo(os)
   }
 
 
@@ -237,6 +241,11 @@ class StompWireFormat extends WireFormat
     }
     if ( write_buffer.remaining() == 0 && write_direct!=null ) {
       write_counter += write_channel.write(write_direct)
+      if( write_direct.remaining() == 0 ) {
+        write_direct = null
+        write_direct_frame.release
+        write_direct_frame = null
+      }
     }
 
     // if it is now empty try to refill...
@@ -245,8 +254,11 @@ class StompWireFormat extends WireFormat
         val prev_size = (write_buffer.position()+512).max(512).min(write_buffer_size)
         write_buffer = next_write_buffer.toBuffer().toByteBuffer()
         write_direct = next_write_direct
+        write_direct_frame = next_write_direct_frame
+
         next_write_buffer = new DataByteArrayOutputStream(prev_size)
         next_write_direct = null
+        next_write_direct_frame = null
     }
 
     if ( is_empty ) {
@@ -382,13 +394,13 @@ class StompWireFormat extends WireFormat
           action = action.trim()
       }
       if (action.length() > 0) {
-          next_action = read_headers(action)
+          next_action = read_headers(action.ascii)
       }
     }
     null
   }
 
-  def read_headers(action:Buffer, headers:HeaderMapBuffer=new HeaderMapBuffer()):FrameReader = (buffer)=> {
+  def read_headers(action:AsciiBuffer, headers:HeaderMapBuffer=new HeaderMapBuffer()):FrameReader = (buffer)=> {
     var rc:StompFrame = null
     val line = read_line(buffer, MAX_HEADER_LENGTH, "The maximum header length was exceeded")
     if( line !=null ) {
@@ -434,7 +446,10 @@ class StompWireFormat extends WireFormat
               throw new IOException("The maximum data length was exceeded")
           }
 
-          if( length > 1024 && memory_pool!=null ) {
+          // lets try to keep the content of big message outside of the JVM's garbage collection
+          // to keep the number of GCs down when moving big messages.
+          def is_message = action == Commands.SEND || action == Responses.MESSAGE
+          if( length > 1024 && memory_pool!=null && is_message) {
 
             val ma = memory_pool.alloc(length+1)
 
@@ -492,7 +507,7 @@ class StompWireFormat extends WireFormat
   }
 
 
-  def read_binary_body_direct(action:Buffer, headers:HeaderMapBuffer, ma:MemoryAllocation):FrameReader = (buffer)=> {
+  def read_binary_body_direct(action:AsciiBuffer, headers:HeaderMapBuffer, ma:MemoryAllocation):FrameReader = (buffer)=> {
     if( read_content_direct(ma) ) {
       next_action = read_action
       new StompFrame(ascii(action), headers.toList, DirectStompContent(ma))
@@ -521,7 +536,7 @@ class StompWireFormat extends WireFormat
       }
   }
 
-  def read_binary_body(action:Buffer, headers:HeaderMapBuffer, contentLength:Int):FrameReader = (buffer)=> {
+  def read_binary_body(action:AsciiBuffer, headers:HeaderMapBuffer, contentLength:Int):FrameReader = (buffer)=> {
     val content:Buffer=read_content(buffer, contentLength)
     if( content != null ) {
       next_action = read_action
@@ -562,7 +577,7 @@ class StompWireFormat extends WireFormat
   }
 
 
-  def read_text_body(action:Buffer, headers:HeaderMapBuffer):FrameReader = (buffer)=> {
+  def read_text_body(action:AsciiBuffer, headers:HeaderMapBuffer):FrameReader = (buffer)=> {
     val content:Buffer=read_to_null(buffer)
     if( content != null ) {
       next_action = read_action

Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/readme.md
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/readme.md?rev=961215&r1=961214&r2=961215&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/readme.md (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/readme.md Wed Jul  7 04:22:00 2010
@@ -4,4 +4,4 @@ Run with ./run
 
 test with a json client.. perhaps curl like this:
 
-curl -i -H "Accept: application/json" localhost:8080/default
\ No newline at end of file
+curl -i -H "Accept: application/json" localhost:8080/brokers
\ No newline at end of file