You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/09/02 13:26:04 UTC

svn commit: r1379944 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/

Author: chirino
Date: Sun Sep  2 11:26:04 2012
New Revision: 1379944

URL: http://svn.apache.org/viewvc?rev=1379944&view=rev
Log:
The queue stats on topic queue could become inconsistent.  Also avoid a small edge case where a  dequeue on a swapped entry would get dropped if it became part of a swap range.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1379944&r1=1379943&r2=1379944&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Sun Sep  2 11:26:04 2012
@@ -30,6 +30,7 @@ import security.{SecuredResource, Securi
 import org.apache.activemq.apollo.dto._
 import java.util.regex.Pattern
 import collection.mutable.ListBuffer
+import java.util
 
 object Queue extends Log {
   val subcsription_counter = new AtomicInteger(0)
@@ -119,6 +120,12 @@ class Queue(val router: LocalRouter, val
   var tune_swap = true
 
   /**
+   * Todo.. see if we can remove this collection.  Don't think it's
+   * actually need.
+   */
+  val in_flight_removes = new util.HashSet[Long]()
+
+  /**
    * The number max number of swapped queue entries to load
    * for the store at a time.  Note that swapped entries are just
    * reference pointers to the actual messages.  When not loaded,
@@ -595,19 +602,13 @@ class Queue(val router: LocalRouter, val
             case state: entry.Loaded =>
               var next = entry.getNext
               if (!entry.is_acquired) {
-                dequeue_item_counter += 1
-                dequeue_size_counter += entry.size
-                dequeue_ts = now
-                entry.remove
+                entry.dequeue(null)
               }
               next
             case state: entry.Swapped =>
               var next = entry.getNext
               if (!entry.is_acquired) {
-                dequeue_item_counter += 1
-                dequeue_size_counter += entry.size
-                dequeue_ts = now
-                entry.remove
+                entry.dequeue(null)
               }
               next
             case state: entry.SwappedRange =>
@@ -665,12 +666,13 @@ class Queue(val router: LocalRouter, val
         tail_entry = new QueueEntry(Queue.this, next_message_seq)
         val queue_delivery = delivery.copy
         queue_delivery.seq = entry.seq
-        entry.init(queue_delivery)
-        
+
         if( tune_persistent ) {
           queue_delivery.uow = delivery.uow
         }
 
+        entry.init(queue_delivery)
+
         entries.addLast(entry)
         enqueue_item_counter += 1
         enqueue_size_counter += entry.size
@@ -679,15 +681,6 @@ class Queue(val router: LocalRouter, val
         // To decrease the enqueue throttle.
         enqueue_remaining_take(entry.size)
 
-        // Do we need to do a persistent enqueue???
-        val persisted = queue_delivery.uow != null
-        if (persisted) {
-          entry.state match {
-            case state:entry.Loaded => state.store
-            case state:entry.Swapped => delivery.uow.enqueue(entry.toQueueEntryRecord)
-          }
-        }
-
         if( entry.hasSubs ) {
           // try to dispatch it directly...
           entry.dispatch
@@ -697,7 +690,7 @@ class Queue(val router: LocalRouter, val
         if( entry.isLinked ) {
           if( !consumers_keeping_up_historically  ) {
             entry.swap(true)
-          } else if( entry.as_loaded.is_acquired && persisted) {
+          } else if( entry.as_loaded.is_acquired && queue_delivery.uow != null) {
             // If the message as dispatched and it's marked to get persisted anyways,
             // then it's ok if it falls out of memory since we won't need to load it again.
             entry.swap(false)
@@ -705,7 +698,7 @@ class Queue(val router: LocalRouter, val
         }
 
         // release the store batch...
-        if (persisted) {
+        if (queue_delivery.uow != null) {
           queue_delivery.uow.release
           queue_delivery.uow = null
         }
@@ -726,17 +719,10 @@ class Queue(val router: LocalRouter, val
   }
 
   def expired(entry:QueueEntry, dequeue:Boolean=true):Unit = {
-    if(dequeue) {
-      might_unfill {
-        dequeue_item_counter += 1
-        dequeue_size_counter += entry.size
-        dequeue_ts = now
-      }
-    }
-
     expired_ts = now
     expired_item_counter += 1
     expired_size_counter += entry.size
+    entry.dequeue(null)
   }
 
   def display_stats: Unit = {
@@ -806,14 +792,12 @@ class Queue(val router: LocalRouter, val
             // acquired.
             if( !x.is_acquired ) {
               expired(cur)
-              x.remove
             }
           case x:QueueEntry#Loaded =>
             // remove the expired message if it has not been
             // acquired.
             if( !x.is_acquired ) {
               expired(cur)
-              x.remove
             }
           case _ =>
         }
@@ -833,36 +817,51 @@ class Queue(val router: LocalRouter, val
     }
 
     // swap out messages.
-    cur = entries.getHead
+    cur = entries.getHead.getNext
+    var dropping_head_entries = is_topic_queue
     while( cur!=null ) {
       val next = cur.getNext
-      if( cur.prefetched ) {
-        // Prefteched entries need to get loaded..
-        cur.load(consumer_swapped_in)
-      } else {
-        // This is a non-prefetched entry.. entires ahead and behind the
-        // consumer subscriptions.
-        val loaded = cur.as_loaded
-        if( loaded!=null ) {
-          // It's in memory.. perhaps we need to swap it out..
-          if(!consumers_keeping_up_historically) {
-            // Swap out ASAP if consumers are not keeping up..
-            cur.swap(true)
+      if ( dropping_head_entries ) {
+        if( cur.parked.isEmpty ) {
+          if( cur.is_swapped_range ) {
+            cur.load(producer_swapped_in)
+            dropping_head_entries=false
           } else {
-            // Consumers seem to be keeping up.. so we have to be more selective
-            // about what gets swapped out..
-
-            if (cur.memory_space eq producer_swapped_in ) {
-              // Entry will be used soon..
-              cur.load(producer_swapped_in)
-            } else if ( cur.is_acquired ) {
-              // Entry was just used...
-              cur.load(consumer_swapped_in)
-//              cur.swap(false)
-            } else {
-              // Does not look to be anywhere close to the consumer.. so get
-              // rid of it asap.
+            cur.dequeue(null)
+          }
+        } else {
+          cur.load(consumer_swapped_in)
+          dropping_head_entries = false
+        }
+      } else {
+        if( cur.prefetched ) {
+          // Prefteched entries need to get loaded..
+          cur.load(consumer_swapped_in)
+        } else {
+          // This is a non-prefetched entry.. entires ahead and behind the
+          // consumer subscriptions.
+          val loaded = cur.as_loaded
+          if( loaded!=null ) {
+            // It's in memory.. perhaps we need to swap it out..
+            if(!consumers_keeping_up_historically) {
+              // Swap out ASAP if consumers are not keeping up..
               cur.swap(true)
+            } else {
+              // Consumers seem to be keeping up.. so we have to be more selective
+              // about what gets swapped out..
+
+              if (cur.memory_space eq producer_swapped_in ) {
+                // Entry will be used soon..
+                cur.load(producer_swapped_in)
+              } else if ( cur.is_acquired ) {
+                // Entry was just used...
+                cur.load(consumer_swapped_in)
+  //              cur.swap(false)
+              } else {
+                // Does not look to be anywhere close to the consumer.. so get
+                // rid of it asap.
+                cur.swap(true)
+              }
             }
           }
         }
@@ -1052,12 +1051,11 @@ class Queue(val router: LocalRouter, val
       case (entry, consumed, uow) =>
         consumed match {
           case Consumed =>
-//            debug("ack consumed: ("+store_id+","+entry.entry.seq+")")
             entry.ack(uow)
           case Expired=>
 //            debug("ack expired: ("+store_id+","+entry.entry.seq+")")
             entry.entry.queue.expired(entry.entry, false)
-            entry.ack(uow)
+            entry.remove()
           case Delivered =>
             entry.increment_nack
             entry.entry.redelivered

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1379944&r1=1379943&r2=1379944&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala Sun Sep  2 11:26:04 2012
@@ -64,15 +64,25 @@ class QueueEntry(val queue:Queue, val se
   }
 
   def init(delivery:Delivery):QueueEntry = {
+
     if( delivery.message == null ) {
       // This must be a swapped out message which has been previously persisted in
       // another queue.  We need to enqueue it to this queue..
       queue.swap_out_size_counter += delivery.size
       queue.swap_out_item_counter += 1
       state = new Swapped(delivery.storeKey, delivery.storeLocator, delivery.size, delivery.expiration, 0, null, delivery.sender)
+      // store it..
+      if( delivery.uow != null ) {
+        delivery.uow.enqueue(toQueueEntryRecord)
+      }
     } else {
       queue.producer_swapped_in += delivery
-      state = new Loaded(delivery, false, queue.producer_swapped_in)
+      val loaded: QueueEntry.this.type#Loaded = new Loaded(delivery, false, queue.producer_swapped_in)
+      state = loaded
+      // store it..
+      if( delivery.uow != null ) {
+        loaded.store
+      }
     }
     this
   }
@@ -195,6 +205,7 @@ class QueueEntry(val queue:Queue, val se
   // These methods may cause a change in the current state.
   def swap(asap:Boolean) = state.swap_out(asap)
   def load(space:MemorySpace) = state.swap_in(space)
+  def dequeue(uow:StoreUOW) = state.dequeue(uow)
   def remove = state.remove
 
   def swapped_range = state.swap_range
@@ -290,14 +301,48 @@ class QueueEntry(val queue:Queue, val se
      * Removes the entry from the queue's linked list of entries.  This gets called
      * as a result of an acquired ack.
      */
-    def remove:Unit = {
+    def dequeue(uow:StoreUOW):Unit = {
+      if (messageKey != -1) {
+        val localuow = if( uow == null ) {
+          queue.virtual_host.store.create_uow
+        } else {
+          uow
+        }
+        localuow.dequeue(entry.toQueueEntryRecord)
+        remove()
+        queue.in_flight_removes.add(seq)
+        localuow.on_complete {
+          queue.dispatch_queue {
+            queue.in_flight_removes.remove(seq)
+            queue.might_unfill {
+              queue.dequeue_item_counter += 1
+              queue.dequeue_size_counter += size
+              queue.dequeue_ts = queue.now
+            }
+          }
+        }
+        if( uow == null ) {
+          localuow.release
+        }
+      } else {
+        queue.might_unfill {
+          remove()
+          queue.dequeue_item_counter += 1
+          queue.dequeue_size_counter += size
+          queue.dequeue_ts = queue.now
+        }
+      }
+    }
+
+    def remove():Unit = {
       // advance subscriptions that were on this entry..
-      advance(parked)
-      parked = Nil
+      if( !parked.isEmpty ) {
+        advance(parked)
+        parked = Nil
+      }
 
       // take the entry of the entries list..
       unlink
-      //TODO: perhaps refill subscriptions.
     }
 
     /**
@@ -337,7 +382,7 @@ class QueueEntry(val queue:Queue, val se
       }
     }
 
-    override def remove = throw new AssertionError("Head entry cannot be removed")
+    override def dequeue(uow:StoreUOW) = throw new AssertionError("Head entry cannot be removed")
     override def swap_in(space:MemorySpace) = throw new AssertionError("Head entry cannot be loaded")
     override def swap_out(asap:Boolean) = throw new AssertionError("Head entry cannot be swapped")
   }
@@ -353,7 +398,7 @@ class QueueEntry(val queue:Queue, val se
     override  def toString = "tail"
     override def as_tail:Tail = this
 
-    override def remove = throw new AssertionError("Tail entry cannot be removed")
+    override def dequeue(uow:StoreUOW) = throw new AssertionError("Tail entry cannot be removed")
     override def swap_in(space:MemorySpace) = throw new AssertionError("Tail entry cannot be loaded")
     override def swap_out(asap:Boolean) = throw new AssertionError("Tail entry cannot be swapped")
 
@@ -401,7 +446,7 @@ class QueueEntry(val queue:Queue, val se
 
     override def redelivered = delivery.redeliveries = ((delivery.redeliveries+1).min(Short.MaxValue)).toShort
 
-    var remove_pending = false
+    var remove_pending:Option[StoreUOW] = None
 
     override def is_swapped_or_swapping_out = {
       swapping_out
@@ -489,11 +534,11 @@ class QueueEntry(val queue:Queue, val se
         if( can_combine_with_prev ) {
           getPrevious.as_swapped_range.combineNext
         }
-        if( remove_pending ) {
-          state.remove
-        } else {
-          queue.loaded_items -= 1
-          queue.loaded_size -= size
+        queue.loaded_items -= 1
+        queue.loaded_size -= size
+
+        if( remove_pending.isDefined ) {
+          state.dequeue(remove_pending.get)
         }
 
         val on_swap_out_copy = on_swap_out
@@ -503,10 +548,10 @@ class QueueEntry(val queue:Queue, val se
         }
 
       } else {
-        if( remove_pending ) {
+        if( remove_pending.isDefined ) {
           delivery.message.release
           space -= delivery
-          super.remove
+          super.dequeue(remove_pending.get)
         }
       }
     }
@@ -520,25 +565,28 @@ class QueueEntry(val queue:Queue, val se
       swapping_out = false
     }
 
-    override def remove = {
-      queue.loaded_items -= 1
-      queue.loaded_size -= size
-      if( storing | remove_pending ) {
-        remove_pending = true
+    override def dequeue(uow:StoreUOW) = {
+      if( storing | remove_pending.isDefined ) {
+        remove_pending = Some(uow)
       } else {
-        delivery.message.release
-        space -= delivery
-        super.remove
+        super.dequeue(uow)
       }
     }
 
+    override def remove() = {
+      queue.loaded_items -= 1
+      queue.loaded_size -= size
+      delivery.message.release
+      space -= delivery
+      super.remove()
+    }
+
     override def dispatch():Boolean = {
 
       queue.assert_executing
 
       if( !is_acquired && expiration != 0 && expiration <= queue.now ) {
         queue.expired(entry)
-        remove
         return true
       }
 
@@ -647,15 +695,7 @@ class QueueEntry(val queue:Queue, val se
 
         // We can drop after dispatch in some cases.
         if( queue.is_topic_queue  && parked.isEmpty && getPrevious.is_head ) {
-          if (messageKey != -1) {
-            val storeBatch = queue.virtual_host.store.create_uow
-            storeBatch.dequeue(toQueueEntryRecord)
-            storeBatch.release
-          }
-          queue.dequeue_item_counter += 1
-          queue.dequeue_size_counter += size
-          queue.dequeue_ts = queue.now
-          remove
+          dequeue(null)
         }
 
         queue.trigger_swap
@@ -763,14 +803,18 @@ class QueueEntry(val queue:Queue, val se
       }
     }
 
-
-    override def remove = {
+    override def dequeue(uow:StoreUOW) = {
       if( space!=null ) {
         space = null
         queue.swapping_in_size -= size
       }
+      super.dequeue(uow)
+    }
+
+
+    override def remove() {
       queue.individual_swapped_items -= 1
-      super.remove
+      super.remove()
     }
 
     override def swap_range = {
@@ -789,7 +833,6 @@ class QueueEntry(val queue:Queue, val se
 
       if( !is_acquired && expiration != 0 && expiration <= queue.now ) {
         queue.expired(entry)
-        remove
         return true
       }
 
@@ -891,9 +934,11 @@ class QueueEntry(val queue:Queue, val se
             val tmpList = new LinkedNodeList[QueueEntry]()
             records.foreach { record =>
               val entry = new QueueEntry(queue, record.entry_seq).init(record)
-              tmpList.addLast(entry)
-              item_count += 1
-              size_count += record.size
+              if( !queue.in_flight_removes.contains(entry.seq) ) {
+                tmpList.addLast(entry)
+                item_count += 1
+                size_count += record.size
+              }
             }
 
             // we may need to adjust the enqueue count if entries

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala?rev=1379944&r1=1379943&r2=1379944&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala Sun Sep  2 11:26:04 2012
@@ -322,21 +322,11 @@ class Subscription(val queue:Queue, val 
 
       total_ack_count += 1
       total_ack_size += entry.size
-      if (entry.messageKey != -1) {
-        val storeBatch = if( uow == null ) {
-          queue.virtual_host.store.create_uow
-        } else {
-          uow
-        }
-        storeBatch.dequeue(entry.toQueueEntryRecord)
-        if( uow == null ) {
-          storeBatch.release
-        }
-      }
-      queue.dequeue_item_counter += 1
-      queue.dequeue_size_counter += entry.size
-      queue.dequeue_ts = queue.now
+      remove()
+      entry.dequeue(uow) // entry size changes to 0
+    }
 
+    def remove():Unit = {
       // removes this entry from the acquired list.
       unlink()
       if( acquired.isEmpty ) {
@@ -347,12 +337,9 @@ class Subscription(val queue:Queue, val 
       acquired_size -= entry.size
 
       val next = entry.nextOrTail
-      entry.remove // entry size changes to 0
-
       queue.trigger_swap
       next.task.run
       check_finish_close
-
     }
 
     def increment_nack = total_nack_count += 1

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala?rev=1379944&r1=1379943&r2=1379944&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala Sun Sep  2 11:26:04 2012
@@ -281,3 +281,47 @@ class StompMetricsTest extends StompTest
   }
 
 }
+
+class StompLevelDBMetricsTest extends StompMetricsTest {
+
+  override def broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
+
+  test("slow_consumer_policy='queue' /w 1 slow and 1 fast consumer.") {
+    var dest_name = next_id("queued.metrics")
+    val dest = "/topic/"+dest_name
+
+    val fast = new StompClient
+    connect("1.1", fast)
+    subscribe("fast", dest, "auto", c=fast);
+
+    val slow = new StompClient
+    connect("1.1", slow)
+    subscribe("fast", dest, "client", c=slow);
+
+    connect("1.1")
+    for( i <- 1 to 1000 ) {
+      async_send(dest, "%01204d".format(i))
+    }
+
+    for( i <- 1 to 1000 ) {
+      assert_received("%01204d".format(i),c=fast)
+    }
+
+    within(3, SECONDS) {
+      val stat = topic_status(dest_name).metrics
+      stat.queue_items should be >= (0L)
+      stat.swapped_in_items should be <= ( stat.queue_items ) // some of it swapped.
+      stat.enqueue_item_counter should be(1000L)
+    }
+
+    slow.close()
+
+    within(3, SECONDS) {
+      val stat = topic_status(dest_name).metrics
+      stat.queue_items should be (0L)
+      stat.swapped_in_items should be(0L)
+      stat.enqueue_item_counter should be(1000L)
+    }
+  }
+
+}