You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/20 18:38:21 UTC
svn commit: r1363866 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/proto/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/
apollo-leveldb/src/main/scala...
Author: chirino
Date: Fri Jul 20 16:38:20 2012
New Revision: 1363866
URL: http://svn.apache.org/viewvc?rev=1363866&view=rev
Log:
Update the queue entry so it can track the history of all the queues it's been in.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto?rev=1363866&r1=1363865&r2=1363866&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto Fri Jul 20 16:38:20 2012
@@ -51,7 +51,7 @@ message QueueEntryPB {
optional int32 redeliveries = 6;
optional sint64 expiration=7;
optional bytes messageLocator=8;
- optional bytes sender=9;
+ repeated bytes sender=9;
}
message MapEntryPB {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1363866&r1=1363865&r2=1363866&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Fri Jul 20 16:38:20 2012
@@ -24,6 +24,7 @@ import org.apache.activemq.apollo.util.L
import java.util.concurrent.atomic.{AtomicReference, AtomicLong}
import org.apache.activemq.apollo.dto.DestinationDTO
import org.apache.activemq.apollo.broker.protocol.{ProtocolFactory, Protocol}
+import scala.Array
object DeliveryProducer extends Log
@@ -119,6 +120,7 @@ trait Message extends Filterable with Re
*/
object Delivery extends Sizer[Delivery] {
def size(value:Delivery):Int = value.size
+ val NO_SENDER = Array[DestinationAddress]()
}
sealed trait DeliveryResult
@@ -158,11 +160,12 @@ object RetainRemove extends RetainAction
object RetainIgnore extends RetainAction
class Delivery {
+ import Delivery._
/**
* Where the delivery is originating from.
*/
- var sender:DestinationAddress = _
+ var sender = NO_SENDER
/**
* Total size of the delivery. Used for resource allocation tracking
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1363866&r1=1363865&r2=1363866&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Fri Jul 20 16:38:20 2012
@@ -1317,11 +1317,7 @@ class QueueEntry(val queue:Queue, val se
}
def init(qer:QueueEntryRecord):QueueEntry = {
- val sender = if ( qer.sender==null ) {
- null
- } else {
- SimpleAddress(qer.sender.utf8().toString)
- }
+ val sender = qer.sender.map[DestinationAddress, Array[DestinationAddress]](x=> SimpleAddress(x.utf8().toString))
state = new Swapped(qer.message_key, qer.message_locator, qer.size, qer.expiration, qer.redeliveries, null, sender)
this
}
@@ -1387,9 +1383,7 @@ class QueueEntry(val queue:Queue, val se
qer.message_locator = state.message_locator
qer.size = state.size
qer.expiration = expiration
- if( state.sender!=null ) {
- qer.sender = new UTF8Buffer(state.sender.toString)
- }
+ qer.sender = state.sender.map(x=> new UTF8Buffer(x.toString))
qer
}
@@ -1499,11 +1493,11 @@ class QueueEntry(val queue:Queue, val se
def message_locator: AtomicReference[Object] = null
- def sender: DestinationAddress = null
+ def sender: Array[DestinationAddress] = Delivery.NO_SENDER
/**
* Attempts to dispatch the current entry to the subscriptions position at the entry.
- * @returns true if at least one subscription advanced to the next entry as a result of dispatching.
+ * @return true if at least one subscription advanced to the next entry as a result of dispatching.
*/
def dispatch() = false
@@ -1791,8 +1785,9 @@ class QueueEntry(val queue:Queue, val se
def browser_copy = {
if( _browser_copy==null ) {
_browser_copy = delivery.copy
- if( _browser_copy.sender==null ) {
- _browser_copy.sender = queue.address
+ // TODO: perhaps only avoid adding the address in the durable sub case..
+ if( _browser_copy.sender.length == 0 ) {
+ _browser_copy.sender = append(_browser_copy.sender, queue.address)
}
}
_browser_copy
@@ -1844,8 +1839,8 @@ class QueueEntry(val queue:Queue, val se
val acquiredQueueEntry = sub.acquire(entry)
val acquiredDelivery = delivery.copy
- if( acquiredDelivery.sender==null ) {
- acquiredDelivery.sender = queue.address
+ if( acquiredDelivery.sender.length == 0 ) {
+ acquiredDelivery.sender = append(acquiredDelivery.sender, queue.address)
}
acquiredDelivery.ack = (consumed, uow)=> {
@@ -1897,7 +1892,7 @@ class QueueEntry(val queue:Queue, val se
* entry is persisted, it can move into this state. This state only holds onto the
* the massage key so that it can reload the message from the store quickly when needed.
*/
- class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Object], override val size:Int, override val expiration:Long, var _redeliveries:Short, var acquirer:Subscription, override val sender:DestinationAddress) extends EntryState {
+ class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Object], override val size:Int, override val expiration:Long, var _redeliveries:Short, var acquirer:Subscription, override val sender:Array[DestinationAddress]) extends EntryState {
queue.individual_swapped_items += 1
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1363866&r1=1363865&r2=1363866&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Fri Jul 20 16:38:20 2012
@@ -152,7 +152,7 @@ class Topic(val router:LocalRouter, val
val copy = value.copy();
copy.uow = value.uow
copy.ack = value.ack
- copy.sender = address
+ copy.sender = append(copy.sender, address)
downstream.offer(copy)
}
}
@@ -410,7 +410,7 @@ class Topic(val router:LocalRouter, val
val r = retained_message
if (r != null) {
val copy = r.copy()
- copy.sender = address
+ copy.sender = append(copy.sender, address)
val producer = new DeliveryProducerRoute(router) {
refiller = NOOP
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala?rev=1363866&r1=1363865&r2=1363866&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala Fri Jul 20 16:38:20 2012
@@ -85,10 +85,12 @@ object PBSupport {
pb.setExpiration(v.expiration)
if(v.redeliveries!=0)
pb.setRedeliveries(v.redeliveries)
- pb.setSender(v.sender)
+ if ( v.sender!=null ) {
+ v.sender.foreach(pb.addSender(_))
+ }
pb
}
-
+ val EMPTY_BUFFER_ARRAY = Array[Buffer]()
implicit def from_pb(pb: QueueEntryPB.Getter):QueueEntryRecord = {
val rc = new QueueEntryRecord
rc.queue_key = pb.getQueueKey
@@ -98,7 +100,12 @@ object PBSupport {
rc.size = pb.getSize
rc.expiration = pb.getExpiration
rc.redeliveries = pb.getRedeliveries.toShort
- rc.sender = pb.getSender
+ var senderList = pb.getSenderList
+ if( senderList!=null ) {
+ rc.sender = senderList.toArray(new Array[Buffer](senderList.size()))
+ } else {
+ rc.sender = EMPTY_BUFFER_ARRAY
+ }
rc
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala?rev=1363866&r1=1363865&r2=1363866&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala Fri Jul 20 16:38:20 2012
@@ -35,6 +35,6 @@ class QueueEntryRecord {
var size = 0
var expiration = 0L
var redeliveries:Short = 0
- var sender:Buffer = _
+ var sender:Array[Buffer] = _
}
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala?rev=1363866&r1=1363865&r2=1363866&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala Fri Jul 20 16:38:20 2012
@@ -884,7 +884,9 @@ class LevelDBClient(store: LevelDBStore)
val log_record = new QueueEntryPB.Bean
// TODO: perhaps we should normalize the sender to make the index entries more compact.
- log_record.setSender(entry.sender)
+ if( entry.sender!=null ) {
+ entry.sender.foreach(log_record.addSender(_))
+ }
log_record.setMessageLocator(locator_buffer)
log_record.setQueueKey(entry.queue_key)
log_record.setQueueSeq(entry.entry_seq)
Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala?rev=1363866&r1=1363865&r2=1363866&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala Fri Jul 20 16:38:20 2012
@@ -19,6 +19,7 @@ package org.apache.activemq.apollo
import org.fusesource.hawtdispatch._
import org.fusesource.hawtdispatch.Future
import java.util.concurrent.CountDownLatch
+import java.util
/**
*
@@ -82,4 +83,11 @@ package object util {
rc.get
}
+ def append[T](src:Array[T], value:T)(implicit m: scala.reflect.Manifest[T]) = {
+ val rc = new Array[T](src.length+1)
+ System.arraycopy(rc, 0, src, 0, src.length)
+ rc(src.length) = value
+ rc
+ }
+
}
\ No newline at end of file