You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/08 16:06:34 UTC

svn commit: r1241924 - /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Author: chirino
Date: Wed Feb  8 15:06:33 2012
New Revision: 1241924

URL: http://svn.apache.org/viewvc?rev=1241924&view=rev
Log:
Don't combine swap entries while they are loading. Fixes unlink errors.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1241924&r1=1241923&r2=1241924&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Feb  8 15:06:33 2012
@@ -1232,6 +1232,11 @@ class QueueEntry(val queue:Queue, val se
   def is_swapped = as_swapped!=null
   def is_swapped_range = as_swapped_range!=null
   def is_swapped_or_swapped_range = is_swapped || is_swapped_range
+  def is_loading = state match {
+    case state:SwappedRange => state.loading
+    case state:Swapped => state.loading
+    case _ => false
+  }
 
   // These should not change the current state.
   def count = state.count
@@ -1256,7 +1261,7 @@ class QueueEntry(val queue:Queue, val se
     getPrevious !=null &&
       getPrevious.is_swapped_range &&
         ( (is_swapped && !is_acquired) || is_swapped_range ) &&
-          (getPrevious.count + count  < queue.tune_swap_range_size)
+          (getPrevious.count + count  < queue.tune_swap_range_size) && !is_loading
   }
 
   trait EntryState {
@@ -1737,6 +1742,8 @@ class QueueEntry(val queue:Queue, val se
 
     override def toString = { "swapped:{ swapping_in: "+space+", acquired:"+acquirer+", size:"+size+"}" }
 
+    def loading = this.space!=null
+
     override def swap_in(mem_space:MemorySpace) = {
       if( this.space==null ) {
 //        trace("Start entry load of message seq: %s", seq)
@@ -1887,64 +1894,61 @@ class QueueEntry(val queue:Queue, val se
     override def size = _size
     override def expiration = _expiration
 
-    var swapping_in = false
+    var loading = false
 
     override def as_swapped_range = this
 
     override def is_swapped_or_swapping_out = true
 
+
     def label = {
       var rc = "swapped_range"
-      if( swapping_in ) {
+      if( loading ) {
         rc = "swapped_range|swapping in"
       }
       rc
     }
-    override def toString = { "swapped_range:{ swapping_in: "+swapping_in+", count: "+count+", size: "+size+"}" }
+    override def toString = { "swapped_range:{ swapping_in: "+loading+", count: "+count+", size: "+size+"}" }
 
     override def swap_in(space:MemorySpace) = {
-      if( !swapping_in ) {
-        swapping_in = true
+      if( !loading ) {
+        loading = true
         queue.virtual_host.store.list_queue_entries(queue.store_id, seq, last) { records =>
-          if( !records.isEmpty ) {
-            queue.dispatch_queue {
-
-              var item_count=0
-              var size_count=0
-
-              val tmpList = new LinkedNodeList[QueueEntry]()
-              records.foreach { record =>
-                val entry = new QueueEntry(queue, record.entry_seq).init(record)
-                tmpList.addLast(entry)
-                item_count += 1
-                size_count += record.size
-              }
-
-              // we may need to adjust the enqueue count if entries
-              // were dropped at the store level
-              var item_delta = (count - item_count)
-              val size_delta: Int = size - size_count
-
-              if ( item_delta!=0 || size_delta!=0 ) {
-                info("Detected store change in range %d to %d. %d message(s) and %d bytes", seq, last, item_delta, size_delta)
-                queue.enqueue_item_counter += item_delta
-                queue.enqueue_size_counter += size_delta
-              }
+          queue.dispatch_queue {
+            loading  = false
+            assert(isLinked)
+
+            var item_count=0
+            var size_count=0
+
+            val tmpList = new LinkedNodeList[QueueEntry]()
+            records.foreach { record =>
+              val entry = new QueueEntry(queue, record.entry_seq).init(record)
+              tmpList.addLast(entry)
+              item_count += 1
+              size_count += record.size
+            }
 
-              linkAfter(tmpList)
-              val next = getNext
+            // we may need to adjust the enqueue count if entries
+            // were dropped at the store level
+            var item_delta = (count - item_count)
+            val size_delta: Int = size - size_count
+
+            if ( item_delta!=0 || size_delta!=0 ) {
+              info("Detected store change in range %d to %d. %d message(s) and %d bytes", seq, last, item_delta, size_delta)
+              queue.enqueue_item_counter += item_delta
+              queue.enqueue_size_counter += size_delta
+            }
 
-              // move the subs to the first entry that we just loaded.
-              parked.foreach(_.advance(next))
-              next :::= parked
-              queue.trigger_swap
+            linkAfter(tmpList)
+            val next = getNext
 
-              unlink
+            // move the subs to the first entry that we just loaded.
+            parked.foreach(_.advance(next))
+            next :::= parked
+            queue.trigger_swap
 
-              // TODO: refill prefetches
-            }
-          } else {
-            warn("range load failed")
+            unlink
           }
         }
       }
@@ -1958,6 +1962,7 @@ class QueueEntry(val queue:Queue, val se
       assert(value!=null)
       assert(value.is_swapped || value.is_swapped_range)
       assert(!value.is_acquired)
+      assert(!value.is_loading)
       if( value.is_swapped ) {
         assert(last < value.seq )
         last = value.seq