You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/08/20 19:20:40 UTC
svn commit: r1375111 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/
apollo-stomp/src/main/scala/org/apache/activemq/apollo/stom...
Author: chirino
Date: Mon Aug 20 17:20:40 2012
New Revision: 1375111
URL: http://svn.apache.org/viewvc?rev=1375111&view=rev
Log:
Change the sender field from being array based to being of List type to improve perf.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1375111&r1=1375110&r2=1375111&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Mon Aug 20 17:20:40 2012
@@ -21,10 +21,8 @@ import org.fusesource.hawtbuf._
import org.apache.activemq.apollo.filter.Filterable
import org.apache.activemq.apollo.broker.store.StoreUOW
import org.apache.activemq.apollo.util.Log
-import java.util.concurrent.atomic.{AtomicReference, AtomicLong}
-import org.apache.activemq.apollo.dto.DestinationDTO
-import org.apache.activemq.apollo.broker.protocol.{MessageCodec, Protocol, ProtocolFactory}
-import scala.Array
+import java.util.concurrent.atomic.AtomicReference
+import org.apache.activemq.apollo.broker.protocol.MessageCodec
object DeliveryProducer extends Log
@@ -120,7 +118,6 @@ trait Message extends Filterable with Re
*/
object Delivery extends Sizer[Delivery] {
def size(value:Delivery):Int = value.size
- val NO_SENDER = Array[DestinationAddress]()
}
sealed trait DeliveryResult
@@ -160,12 +157,11 @@ object RetainRemove extends RetainAction
object RetainIgnore extends RetainAction
class Delivery {
- import Delivery._
/**
* Where the delivery is originating from.
*/
- var sender = NO_SENDER
+ var sender = List[DestinationAddress]()
/**
* Total size of the delivery. Used for resource allocation tracking
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala?rev=1375111&r1=1375110&r2=1375111&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala Mon Aug 20 17:20:40 2012
@@ -45,7 +45,7 @@ class DestinationParser extends PathPars
this
}
- def encode_destination(addresses: Array[_ <: DestinationAddress]): String = {
+ def encode_destination(addresses: Seq[_ <: DestinationAddress]): String = {
if (addresses == null) {
null
} else {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1375111&r1=1375110&r2=1375111&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala Mon Aug 20 17:20:40 2012
@@ -78,7 +78,7 @@ class QueueEntry(val queue:Queue, val se
}
def init(qer:QueueEntryRecord):QueueEntry = {
- val sender = qer.sender.map[DestinationAddress, Array[DestinationAddress]](x=> SimpleAddress(x.utf8().toString))
+ val sender = qer.sender.map(x=> SimpleAddress(x.utf8().toString))
state = new Swapped(qer.message_key, qer.message_locator, qer.size, qer.expiration, qer.redeliveries, null, sender)
this
}
@@ -254,7 +254,7 @@ class QueueEntry(val queue:Queue, val se
def message_locator: AtomicReference[Object] = null
- def sender: Array[DestinationAddress] = Delivery.NO_SENDER
+ def sender = List[DestinationAddress]()
/**
* Attempts to dispatch the current entry to the subscriptions position at the entry.
@@ -547,8 +547,8 @@ class QueueEntry(val queue:Queue, val se
if( _browser_copy==null ) {
_browser_copy = delivery.copy
// TODO: perhaps only avoid adding the address in the durable sub case..
- if( _browser_copy.sender.length == 0 ) {
- _browser_copy.sender = append(_browser_copy.sender, queue.address)
+ if( _browser_copy.sender == Nil ) {
+ _browser_copy.sender ::= queue.address
}
}
_browser_copy
@@ -600,8 +600,8 @@ class QueueEntry(val queue:Queue, val se
val acquiredQueueEntry = sub.acquire(entry)
val acquiredDelivery = delivery.copy
- if( acquiredDelivery.sender.length == 0 ) {
- acquiredDelivery.sender = append(acquiredDelivery.sender, queue.address)
+ if( acquiredDelivery.sender == Nil) {
+ acquiredDelivery.sender ::= queue.address
}
acquiredDelivery.ack = (consumed, uow)=> {
@@ -653,7 +653,7 @@ class QueueEntry(val queue:Queue, val se
* entry is persisted, it can move into this state. This state only holds onto the
* the massage key so that it can reload the message from the store quickly when needed.
*/
- class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Object], override val size:Int, override val expiration:Long, var _redeliveries:Short, var acquirer:Subscription, override val sender:Array[DestinationAddress]) extends EntryState {
+ class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Object], override val size:Int, override val expiration:Long, var _redeliveries:Short, var acquirer:Subscription, override val sender:List[DestinationAddress]) extends EntryState {
queue.individual_swapped_items += 1
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1375111&r1=1375110&r2=1375111&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Mon Aug 20 17:20:40 2012
@@ -152,7 +152,7 @@ class Topic(val router:LocalRouter, val
val copy = value.copy();
copy.uow = value.uow
copy.ack = value.ack
- copy.sender = append(copy.sender, address)
+ copy.sender ::= address
downstream.offer(copy)
}
}
@@ -417,7 +417,7 @@ class Topic(val router:LocalRouter, val
val r = retained_message
if (r != null) {
val copy = r.copy()
- copy.sender = append(copy.sender, address)
+ copy.sender ::= address
val producer = new DeliveryProducerRoute(router) {
refiller = NOOP
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala?rev=1375111&r1=1375110&r2=1375111&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala Mon Aug 20 17:20:40 2012
@@ -2,6 +2,8 @@ package org.apache.activemq.apollo.broke
import java.io.{OutputStream, InputStream}
import org.fusesource.hawtbuf.Buffer
+import org.apache.activemq.apollo.broker.DestinationAddress
+import collection.mutable.ListBuffer
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
@@ -90,8 +92,8 @@ object PBSupport {
}
pb
}
- val EMPTY_BUFFER_ARRAY = Array[Buffer]()
implicit def from_pb(pb: QueueEntryPB.Getter):QueueEntryRecord = {
+ import collection.JavaConversions._
val rc = new QueueEntryRecord
rc.queue_key = pb.getQueueKey
rc.entry_seq = pb.getQueueSeq
@@ -102,9 +104,9 @@ object PBSupport {
rc.redeliveries = pb.getRedeliveries.toShort
var senderList = pb.getSenderList
if( senderList!=null ) {
- rc.sender = senderList.toArray(new Array[Buffer](senderList.size()))
+ rc.sender = senderList.toList
} else {
- rc.sender = EMPTY_BUFFER_ARRAY
+ rc.sender = List()
}
rc
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala?rev=1375111&r1=1375110&r2=1375111&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala Mon Aug 20 17:20:40 2012
@@ -21,6 +21,7 @@ package org.apache.activemq.apollo.broke
import org.fusesource.hawtbuf.Buffer
import java.util.concurrent.atomic.AtomicReference
+import collection.mutable.ListBuffer
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -35,6 +36,6 @@ class QueueEntryRecord {
var size = 0
var expiration = 0L
var redeliveries:Short = 0
- var sender:Array[Buffer] = _
+ var sender:List[Buffer] = _
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1375111&r1=1375110&r2=1375111&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Mon Aug 20 17:20:40 2012
@@ -450,7 +450,7 @@ class StompProtocolHandler extends Proto
var headers = (MESSAGE_ID -> ascii(session_id.get+message_id_counter)) :: Nil
headers ::= (CONTENT_TYPE -> ascii(content_type))
headers ::= (CONTENT_LENGTH -> ascii(body.length().toString))
- headers ::= (DESTINATION -> encode_header(destination_parser.encode_destination(delivery.sender.head)))
+ headers ::= (DESTINATION -> encode_header(destination_parser.encode_destination(delivery.sender.tail)))
StompFrame(MESSAGE, headers, BufferContent(body))
}
Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala?rev=1375111&r1=1375110&r2=1375111&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala Mon Aug 20 17:20:40 2012
@@ -85,11 +85,4 @@ package object util {
rc.get
}
- def append[T](src:Array[T], value:T)(implicit m: scala.reflect.Manifest[T]) = {
- val rc = new Array[T](src.length+1)
- System.arraycopy(rc, 0, src, 0, src.length)
- rc(src.length) = value
- rc
- }
-
}
\ No newline at end of file