You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/20 18:38:21 UTC

svn commit: r1363866 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/proto/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ apollo-leveldb/src/main/scala...

Author: chirino
Date: Fri Jul 20 16:38:20 2012
New Revision: 1363866

URL: http://svn.apache.org/viewvc?rev=1363866&view=rev
Log:
Update the queue entry so it can track the history of all the queues it's been in.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto?rev=1363866&r1=1363865&r2=1363866&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto Fri Jul 20 16:38:20 2012
@@ -51,7 +51,7 @@ message QueueEntryPB {
   optional int32 redeliveries = 6;
   optional sint64 expiration=7;
   optional bytes messageLocator=8;
-  optional bytes sender=9;
+  repeated bytes sender=9;
 }
 
 message MapEntryPB {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1363866&r1=1363865&r2=1363866&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Fri Jul 20 16:38:20 2012
@@ -24,6 +24,7 @@ import org.apache.activemq.apollo.util.L
 import java.util.concurrent.atomic.{AtomicReference, AtomicLong}
 import org.apache.activemq.apollo.dto.DestinationDTO
 import org.apache.activemq.apollo.broker.protocol.{ProtocolFactory, Protocol}
+import scala.Array
 
 object DeliveryProducer extends Log
 
@@ -119,6 +120,7 @@ trait Message extends Filterable with Re
  */
 object Delivery extends Sizer[Delivery] {
   def size(value:Delivery):Int = value.size
+  val NO_SENDER = Array[DestinationAddress]()
 }
 
 sealed trait DeliveryResult
@@ -158,11 +160,12 @@ object RetainRemove extends RetainAction
 object RetainIgnore extends RetainAction
 
 class Delivery {
+  import Delivery._
 
   /**
    * Where the delivery is originating from.
    */
-  var sender:DestinationAddress = _
+  var sender = NO_SENDER
 
   /**
    * Total size of the delivery.  Used for resource allocation tracking

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1363866&r1=1363865&r2=1363866&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Fri Jul 20 16:38:20 2012
@@ -1317,11 +1317,7 @@ class QueueEntry(val queue:Queue, val se
   }
 
   def init(qer:QueueEntryRecord):QueueEntry = {
-    val sender = if ( qer.sender==null ) {
-      null
-    } else {
-      SimpleAddress(qer.sender.utf8().toString)
-    }
+    val sender = qer.sender.map[DestinationAddress, Array[DestinationAddress]](x=> SimpleAddress(x.utf8().toString))
     state = new Swapped(qer.message_key, qer.message_locator, qer.size, qer.expiration, qer.redeliveries, null, sender)
     this
   }
@@ -1387,9 +1383,7 @@ class QueueEntry(val queue:Queue, val se
     qer.message_locator = state.message_locator
     qer.size = state.size
     qer.expiration = expiration
-    if( state.sender!=null ) {
-      qer.sender = new UTF8Buffer(state.sender.toString)
-    }
+    qer.sender = state.sender.map(x=> new UTF8Buffer(x.toString))
     qer
   }
 
@@ -1499,11 +1493,11 @@ class QueueEntry(val queue:Queue, val se
 
     def message_locator: AtomicReference[Object] = null
 
-    def sender: DestinationAddress = null
+    def sender: Array[DestinationAddress] = Delivery.NO_SENDER
 
     /**
      * Attempts to dispatch the current entry to the subscriptions position at the entry.
-     * @returns true if at least one subscription advanced to the next entry as a result of dispatching.
+     * @return true if at least one subscription advanced to the next entry as a result of dispatching.
      */
     def dispatch() = false
 
@@ -1791,8 +1785,9 @@ class QueueEntry(val queue:Queue, val se
       def browser_copy = {
         if( _browser_copy==null ) {
           _browser_copy = delivery.copy
-          if( _browser_copy.sender==null ) {
-            _browser_copy.sender = queue.address
+          // TODO: perhaps only avoid adding the address in the durable sub case..
+          if( _browser_copy.sender.length == 0 ) {
+            _browser_copy.sender = append(_browser_copy.sender, queue.address)
           }
         }
         _browser_copy
@@ -1844,8 +1839,8 @@ class QueueEntry(val queue:Queue, val se
 
                   val acquiredQueueEntry = sub.acquire(entry)
                   val acquiredDelivery = delivery.copy
-                  if( acquiredDelivery.sender==null ) {
-                    acquiredDelivery.sender = queue.address
+                  if( acquiredDelivery.sender.length == 0 ) {
+                    acquiredDelivery.sender = append(acquiredDelivery.sender, queue.address)
                   }
 
                   acquiredDelivery.ack = (consumed, uow)=> {
@@ -1897,7 +1892,7 @@ class QueueEntry(val queue:Queue, val se
    * entry is persisted, it can move into this state.  This state only holds onto the
    * the massage key so that it can reload the message from the store quickly when needed.
    */
-  class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Object], override val size:Int, override val expiration:Long, var _redeliveries:Short, var acquirer:Subscription, override  val sender:DestinationAddress) extends EntryState {
+  class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Object], override val size:Int, override val expiration:Long, var _redeliveries:Short, var acquirer:Subscription, override  val sender:Array[DestinationAddress]) extends EntryState {
 
     queue.individual_swapped_items += 1
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1363866&r1=1363865&r2=1363866&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Fri Jul 20 16:38:20 2012
@@ -152,7 +152,7 @@ class Topic(val router:LocalRouter, val 
       val copy = value.copy();
       copy.uow = value.uow
       copy.ack = value.ack
-      copy.sender = address
+      copy.sender = append(copy.sender, address)
       downstream.offer(copy)
     }
   }
@@ -410,7 +410,7 @@ class Topic(val router:LocalRouter, val 
     val r = retained_message
     if (r != null) {
       val copy = r.copy()
-      copy.sender = address
+      copy.sender = append(copy.sender, address)
 
       val producer = new  DeliveryProducerRoute(router) {
         refiller = NOOP

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala?rev=1363866&r1=1363865&r2=1363866&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala Fri Jul 20 16:38:20 2012
@@ -85,10 +85,12 @@ object PBSupport {
       pb.setExpiration(v.expiration)
     if(v.redeliveries!=0)
       pb.setRedeliveries(v.redeliveries)
-    pb.setSender(v.sender)
+    if ( v.sender!=null ) {
+      v.sender.foreach(pb.addSender(_))
+    }
     pb
   }
-
+  val EMPTY_BUFFER_ARRAY = Array[Buffer]()
   implicit def from_pb(pb: QueueEntryPB.Getter):QueueEntryRecord = {
     val rc = new QueueEntryRecord
     rc.queue_key = pb.getQueueKey
@@ -98,7 +100,12 @@ object PBSupport {
     rc.size = pb.getSize
     rc.expiration = pb.getExpiration
     rc.redeliveries = pb.getRedeliveries.toShort
-    rc.sender = pb.getSender
+    var senderList = pb.getSenderList
+    if( senderList!=null ) {
+      rc.sender = senderList.toArray(new Array[Buffer](senderList.size()))
+    } else {
+      rc.sender = EMPTY_BUFFER_ARRAY
+    }
     rc
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala?rev=1363866&r1=1363865&r2=1363866&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala Fri Jul 20 16:38:20 2012
@@ -35,6 +35,6 @@ class QueueEntryRecord {
   var size = 0
   var expiration = 0L
   var redeliveries:Short = 0
-  var sender:Buffer = _
+  var sender:Array[Buffer] = _
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala?rev=1363866&r1=1363865&r2=1363866&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala Fri Jul 20 16:38:20 2012
@@ -884,7 +884,9 @@ class LevelDBClient(store: LevelDBStore)
 
                           val log_record = new QueueEntryPB.Bean
                           // TODO: perhaps we should normalize the sender to make the index entries more compact.
-                          log_record.setSender(entry.sender)
+                          if( entry.sender!=null ) {
+                            entry.sender.foreach(log_record.addSender(_))
+                          }
                           log_record.setMessageLocator(locator_buffer)
                           log_record.setQueueKey(entry.queue_key)
                           log_record.setQueueSeq(entry.entry_seq)

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala?rev=1363866&r1=1363865&r2=1363866&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala Fri Jul 20 16:38:20 2012
@@ -19,6 +19,7 @@ package org.apache.activemq.apollo
 import org.fusesource.hawtdispatch._
 import org.fusesource.hawtdispatch.Future
 import java.util.concurrent.CountDownLatch
+import java.util
 
 /**
  *
@@ -82,4 +83,11 @@ package object util {
     rc.get
   }
 
+  def append[T](src:Array[T], value:T)(implicit m: scala.reflect.Manifest[T]) = {
+    val rc = new Array[T](src.length+1)
+    System.arraycopy(rc, 0, src, 0, src.length)
+    rc(src.length) = value
+    rc
+  }
+
 }
\ No newline at end of file