You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/08/20 19:20:40 UTC

svn commit: r1375111 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stom...

Author: chirino
Date: Mon Aug 20 17:20:40 2012
New Revision: 1375111

URL: http://svn.apache.org/viewvc?rev=1375111&view=rev
Log:
Change the sender field from being array based to being of List type to improve perf.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1375111&r1=1375110&r2=1375111&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Mon Aug 20 17:20:40 2012
@@ -21,10 +21,8 @@ import org.fusesource.hawtbuf._
 import org.apache.activemq.apollo.filter.Filterable
 import org.apache.activemq.apollo.broker.store.StoreUOW
 import org.apache.activemq.apollo.util.Log
-import java.util.concurrent.atomic.{AtomicReference, AtomicLong}
-import org.apache.activemq.apollo.dto.DestinationDTO
-import org.apache.activemq.apollo.broker.protocol.{MessageCodec, Protocol, ProtocolFactory}
-import scala.Array
+import java.util.concurrent.atomic.AtomicReference
+import org.apache.activemq.apollo.broker.protocol.MessageCodec
 
 object DeliveryProducer extends Log
 
@@ -120,7 +118,6 @@ trait Message extends Filterable with Re
  */
 object Delivery extends Sizer[Delivery] {
   def size(value:Delivery):Int = value.size
-  val NO_SENDER = Array[DestinationAddress]()
 }
 
 sealed trait DeliveryResult
@@ -160,12 +157,11 @@ object RetainRemove extends RetainAction
 object RetainIgnore extends RetainAction
 
 class Delivery {
-  import Delivery._
 
   /**
    * Where the delivery is originating from.
    */
-  var sender = NO_SENDER
+  var sender = List[DestinationAddress]()
 
   /**
    * Total size of the delivery.  Used for resource allocation tracking

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala?rev=1375111&r1=1375110&r2=1375111&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala Mon Aug 20 17:20:40 2012
@@ -45,7 +45,7 @@ class DestinationParser extends PathPars
     this
   }
 
-  def encode_destination(addresses: Array[_ <: DestinationAddress]): String = {
+  def encode_destination(addresses: Seq[_ <: DestinationAddress]): String = {
     if (addresses == null) {
       null
     } else {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1375111&r1=1375110&r2=1375111&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala Mon Aug 20 17:20:40 2012
@@ -78,7 +78,7 @@ class QueueEntry(val queue:Queue, val se
   }
 
   def init(qer:QueueEntryRecord):QueueEntry = {
-    val sender = qer.sender.map[DestinationAddress, Array[DestinationAddress]](x=> SimpleAddress(x.utf8().toString))
+    val sender = qer.sender.map(x=> SimpleAddress(x.utf8().toString))
     state = new Swapped(qer.message_key, qer.message_locator, qer.size, qer.expiration, qer.redeliveries, null, sender)
     this
   }
@@ -254,7 +254,7 @@ class QueueEntry(val queue:Queue, val se
 
     def message_locator: AtomicReference[Object] = null
 
-    def sender: Array[DestinationAddress] = Delivery.NO_SENDER
+    def sender = List[DestinationAddress]()
 
     /**
      * Attempts to dispatch the current entry to the subscriptions position at the entry.
@@ -547,8 +547,8 @@ class QueueEntry(val queue:Queue, val se
         if( _browser_copy==null ) {
           _browser_copy = delivery.copy
           // TODO: perhaps only avoid adding the address in the durable sub case..
-          if( _browser_copy.sender.length == 0 ) {
-            _browser_copy.sender = append(_browser_copy.sender, queue.address)
+          if( _browser_copy.sender == Nil ) {
+            _browser_copy.sender ::= queue.address
           }
         }
         _browser_copy
@@ -600,8 +600,8 @@ class QueueEntry(val queue:Queue, val se
 
                   val acquiredQueueEntry = sub.acquire(entry)
                   val acquiredDelivery = delivery.copy
-                  if( acquiredDelivery.sender.length == 0 ) {
-                    acquiredDelivery.sender = append(acquiredDelivery.sender, queue.address)
+                  if( acquiredDelivery.sender == Nil) {
+                    acquiredDelivery.sender ::= queue.address
                   }
 
                   acquiredDelivery.ack = (consumed, uow)=> {
@@ -653,7 +653,7 @@ class QueueEntry(val queue:Queue, val se
    * entry is persisted, it can move into this state.  This state only holds onto the
    * the massage key so that it can reload the message from the store quickly when needed.
    */
-  class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Object], override val size:Int, override val expiration:Long, var _redeliveries:Short, var acquirer:Subscription, override  val sender:Array[DestinationAddress]) extends EntryState {
+  class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Object], override val size:Int, override val expiration:Long, var _redeliveries:Short, var acquirer:Subscription, override  val sender:List[DestinationAddress]) extends EntryState {
 
     queue.individual_swapped_items += 1
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1375111&r1=1375110&r2=1375111&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Mon Aug 20 17:20:40 2012
@@ -152,7 +152,7 @@ class Topic(val router:LocalRouter, val 
       val copy = value.copy();
       copy.uow = value.uow
       copy.ack = value.ack
-      copy.sender = append(copy.sender, address)
+      copy.sender ::= address
       downstream.offer(copy)
     }
   }
@@ -417,7 +417,7 @@ class Topic(val router:LocalRouter, val 
     val r = retained_message
     if (r != null) {
       val copy = r.copy()
-      copy.sender = append(copy.sender, address)
+      copy.sender ::= address
 
       val producer = new  DeliveryProducerRoute(router) {
         refiller = NOOP

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala?rev=1375111&r1=1375110&r2=1375111&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala Mon Aug 20 17:20:40 2012
@@ -2,6 +2,8 @@ package org.apache.activemq.apollo.broke
 
 import java.io.{OutputStream, InputStream}
 import org.fusesource.hawtbuf.Buffer
+import org.apache.activemq.apollo.broker.DestinationAddress
+import collection.mutable.ListBuffer
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
@@ -90,8 +92,8 @@ object PBSupport {
     }
     pb
   }
-  val EMPTY_BUFFER_ARRAY = Array[Buffer]()
   implicit def from_pb(pb: QueueEntryPB.Getter):QueueEntryRecord = {
+    import collection.JavaConversions._
     val rc = new QueueEntryRecord
     rc.queue_key = pb.getQueueKey
     rc.entry_seq = pb.getQueueSeq
@@ -102,9 +104,9 @@ object PBSupport {
     rc.redeliveries = pb.getRedeliveries.toShort
     var senderList = pb.getSenderList
     if( senderList!=null ) {
-      rc.sender = senderList.toArray(new Array[Buffer](senderList.size()))
+      rc.sender = senderList.toList
     } else {
-      rc.sender = EMPTY_BUFFER_ARRAY
+      rc.sender = List()
     }
     rc
   }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala?rev=1375111&r1=1375110&r2=1375111&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala Mon Aug 20 17:20:40 2012
@@ -21,6 +21,7 @@ package org.apache.activemq.apollo.broke
 
 import org.fusesource.hawtbuf.Buffer
 import java.util.concurrent.atomic.AtomicReference
+import collection.mutable.ListBuffer
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -35,6 +36,6 @@ class QueueEntryRecord {
   var size = 0
   var expiration = 0L
   var redeliveries:Short = 0
-  var sender:Array[Buffer] = _
+  var sender:List[Buffer] = _
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1375111&r1=1375110&r2=1375111&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Mon Aug 20 17:20:40 2012
@@ -450,7 +450,7 @@ class StompProtocolHandler extends Proto
         var headers =  (MESSAGE_ID -> ascii(session_id.get+message_id_counter)) :: Nil
         headers ::= (CONTENT_TYPE -> ascii(content_type))
         headers ::= (CONTENT_LENGTH -> ascii(body.length().toString))
-        headers ::= (DESTINATION -> encode_header(destination_parser.encode_destination(delivery.sender.head)))
+        headers ::= (DESTINATION -> encode_header(destination_parser.encode_destination(delivery.sender.tail)))
         StompFrame(MESSAGE, headers, BufferContent(body))
       }
 

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala?rev=1375111&r1=1375110&r2=1375111&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala Mon Aug 20 17:20:40 2012
@@ -85,11 +85,4 @@ package object util {
     rc.get
   }
 
-  def append[T](src:Array[T], value:T)(implicit m: scala.reflect.Manifest[T]) = {
-    val rc = new Array[T](src.length+1)
-    System.arraycopy(rc, 0, src, 0, src.length)
-    rc(src.length) = value
-    rc
-  }
-
 }
\ No newline at end of file