You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:11:16 UTC

svn commit: r961151 - /activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Author: chirino
Date: Wed Jul  7 04:11:16 2010
New Revision: 961151

URL: http://svn.apache.org/viewvc?rev=961151&view=rev
Log:
Improving the state management of the queue entries.

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961151&r1=961150&r2=961151&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:11:16 2010
@@ -1,5 +1,5 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
@@ -81,25 +81,19 @@ class Queue(val host: VirtualHost, val d
   ack_source.setEventHandler(^ {drain_acks});
   ack_source.resume
 
-  val store_load_source = createSource(new ListEventAggregator[(QueueEntry, MessageRecord)](), dispatchQueue)
-  store_load_source.setEventHandler(^ {drain_store_loads});
-  store_load_source.resume
-
-  val store_flush_source = createSource(new ListEventAggregator[QueueEntry](), dispatchQueue)
-  store_flush_source.setEventHandler(^ {drain_store_flushes});
-  store_flush_source.resume
-
   val session_manager = new SinkMux[Delivery](messages, dispatchQueue, Delivery)
 
   // sequence numbers.. used to track what's in the store.
   var message_seq_counter = 1L
 
-  val headEntry = new QueueEntry(this, 0L).tombstone
-  var tailEntry = new QueueEntry(this, next_message_seq)
-
   var counter = 0
+
   val entries = new LinkedNodeList[QueueEntry]()
+  val headEntry = new QueueEntry(this, 0L);
+  var tailEntry = new QueueEntry(this, next_message_seq)
+
   entries.addFirst(headEntry)
+  headEntry.tombstone
 
   var loadingSize = 0
   var flushingSize = 0
@@ -142,7 +136,7 @@ class Queue(val host: VirtualHost, val d
   var dequeue_size = 0L
 
   private var capacity = tune_inbound_buffer
-  private var size = 0
+  var size = 0
 
   schedualSlowConsumerCheck
 
@@ -150,7 +144,7 @@ class Queue(val host: VirtualHost, val d
     this.storeId = storeId
     if( !records.isEmpty ) {
       records.foreach { qer =>
-        val entry = new QueueEntry(Queue.this,qer.queueSeq).flushed(qer)
+        val entry = new QueueEntry(Queue.this,qer.queueSeq).init(qer)
         entries.addLast(entry)
       }
 
@@ -183,10 +177,9 @@ class Queue(val host: VirtualHost, val d
         val entry = tailEntry
         tailEntry = new QueueEntry(Queue.this, next_message_seq)
         val queueDelivery = delivery.copy
-        entry.created(queueDelivery)
+        entry.init(queueDelivery)
         queueDelivery.storeBatch = delivery.storeBatch
 
-        size += entry.size
         entries.addLast(entry)
         counter += 1;
         enqueue_counter += 1
@@ -194,14 +187,14 @@ class Queue(val host: VirtualHost, val d
 
         // Do we need to do a persistent enqueue???
         if (queueDelivery.storeBatch != null) {
-          queueDelivery.storeBatch.enqueue(entry.createQueueEntryRecord)
+          queueDelivery.storeBatch.enqueue(entry.toQueueEntryRecord)
         }
 
 
         def haveQuickConsumer = fastSubs.find( sub=> sub.pos.seq <= entry.seq ).isDefined
 
         var dispatched = false
-        if( entry.prefetched > 0 || haveQuickConsumer ) {
+        if( entry.hasSubs || haveQuickConsumer ) {
           // try to dispatch it directly...
           entry.dispatch
         } else {
@@ -227,15 +220,9 @@ class Queue(val host: VirtualHost, val d
 
     def slowConsumerCheck = {
       if( retained > 0 ) {
-        checkCounter += 1
-
-        // target tune_min_subscription_rate / sec
-        val slowCursorDelta = (((tune_slow_subscription_rate) * tune_slow_check_interval) / 1000).toInt
 
-        var idleConsumerCount = 0
-
-// Handy for periodically looking at the dispatch state...
-//
+        // Handy for periodically looking at the dispatch state...
+//        checkCounter += 1
 //        if( !consumerSubs.isEmpty && (checkCounter%100)==0 ) {
 //          println("using "+size+" out of "+capacity+" buffer space.");
 //          var cur = entries.getHead
@@ -248,6 +235,9 @@ class Queue(val host: VirtualHost, val d
 //          println("tail: "+tailEntry)
 //        }
 
+        // target tune_min_subscription_rate / sec
+        val slowCursorDelta = (((tune_slow_subscription_rate) * tune_slow_check_interval) / 1000).toInt
+        var idleConsumerCount = 0
         fastSubs = Nil
 
         consumerSubs.foreach{ case (consumer, sub)=>
@@ -316,13 +306,13 @@ class Queue(val host: VirtualHost, val d
   }
 
   def ack(entry: QueueEntry, sb:StoreBatch) = {
-    if (entry.ref != -1) {
+    if (entry.messageKey != -1) {
       val storeBatch = if( sb == null ) {
         host.store.createStoreBatch
       } else {
         sb
       }
-      storeBatch.dequeue(entry.createQueueEntryRecord)
+      storeBatch.dequeue(entry.toQueueEntryRecord)
       if( sb == null ) {
         storeBatch.release
       }
@@ -334,7 +324,6 @@ class Queue(val host: VirtualHost, val d
     dequeue_counter += 1
     counter -= 1
     dequeue_size += entry.size
-    size -= entry.size
     entry.tombstone
   }
 
@@ -477,158 +466,78 @@ class Queue(val host: VirtualHost, val d
     }
   }
 
-  def drain_store_loads() = {
-    val data = store_load_source.getData
-    data.foreach { case (entry,flushed) =>
 
-      loadingSize -= entry.size
+  val store_flush_source = createSource(new ListEventAggregator[QueueEntry#Loaded](), dispatchQueue)
+  store_flush_source.setEventHandler(^ {drain_store_flushes});
+  store_flush_source.resume
 
-      val delivery = new Delivery()
-      delivery.message = ProtocolFactory.get(flushed.protocol).decode(flushed.value)
-      delivery.size = flushed.size
-      delivery.storeKey = flushed.key
+  def drain_store_flushes() = {
+    val data = store_flush_source.getData
+    data.foreach { loaded =>
+      loaded.flushed
+    }
+    messages.refiller.run
 
-      entry.loaded(delivery)
+  }
 
-      size += entry.size
+  val store_load_source = createSource(new ListEventAggregator[(QueueEntry#Flushed, MessageRecord)](), dispatchQueue)
+  store_load_source.setEventHandler(^ {drain_store_loads});
+  store_load_source.resume
 
-    }
 
-    data.foreach { case (entry,_) =>
-      if( entry.hasSubs ) {
-        entry.run
-      }
+  def drain_store_loads() = {
+    val data = store_load_source.getData
+    data.foreach { case (flushed,messageRecord) =>
+      flushed.loaded(messageRecord)
     }
-  }
 
-  def drain_store_flushes() = {
-    val data = store_flush_source.getData
-    data.foreach { entry =>
-      flushingSize -= entry.size
-
-      // by the time we get called back, subs my be interested in the entry
-      // or it may have been acked.
-      if( !entry.hasSubs && entry.asLoaded!=null ) {
-        size -= entry.size
-        entry.flushed
+    data.foreach { case (flushed,_) =>
+      if( flushed.entry.hasSubs ) {
+        flushed.entry.run
       }
     }
-    messages.refiller.run
-
   }
 
 }
 
-
 object QueueEntry extends Sizer[QueueEntry] {
-
   def size(value: QueueEntry): Int = value.size
 }
 
 class QueueEntry(val queue:Queue, val seq:Long) extends LinkedNode[QueueEntry] with Comparable[QueueEntry] with Runnable with DispatchLogging {
   override protected def log = Queue
-
   import QueueEntry._
 
+  // Competing subscriptions try to exclusivly aquire the entry.
   var competing:List[Subscription] = Nil
+  // These are subscriptions which will not be exclusivly aquiring the entry.
   var browsing:List[Subscription] = Nil
+  // The number of subscriptions which have requested this entry to be prefetech (held in memory) so that it's
+  // ready for them to get dispatched.
   var prefetched = 0
 
-  var value:EntryType = new Tail
-
-  override def toString = {
-    "{seq: "+seq+", prefetched: "+prefetched+", value: "+value+", competing: "+competing+", browsing: "+browsing+"}"
-  }
+  // The current state of the entry: Tail | Loaded | Flushed | Tombstone
+  var state:EntryState = new Tail
 
-  def createQueueEntryRecord = {
-    val qer = new QueueEntryRecord
-    qer.queueKey = queue.storeId
-    qer.queueSeq = seq
-    qer.messageKey = value.ref
-    qer.size = value.size
-    qer
-  }
 
-  def compareTo(o: QueueEntry) = {
-    (seq - o.seq).toInt
-  }
 
-
-  def created(delivery:Delivery) = {
-    this.value = new Loaded(delivery)
+  def init(delivery:Delivery):QueueEntry = {
+    this.state = new Loaded(delivery)
+    queue.size += size
     this
   }
 
-  def loaded(delivery:Delivery) = {
-    this.value = new Loaded(delivery)
-    this
-  }
-
-  def flushed() = {
-    val loaded = value.asLoaded
-    this.value = new Flushed(loaded.delivery.storeKey, loaded.size)
-    this
-  }
-
-  def flushed(qer:QueueEntryRecord) = {
-    this.value = new Flushed(qer.messageKey, qer.size)
-    this
-  }
-
-  def tombstone = {
-
-    // remove from prefetch counters..
-    var cur = this;
-    while( cur!=null && prefetched > 0 ) {
-      if( cur.hasSubs ) {
-        (cur.browsing ::: cur.competing).foreach { sub => if( sub.prefetched(cur) ) { sub.removePrefetch(cur) } }
-      }
-      cur = cur.getPrevious
-      if( cur == null ) {
-        error("illegal prefetch state detected.")
-      }
-    }
-
-    this.value = new Tombstone()
-    if( seq != 0L ) {
-
-      def merge(lv:QueueEntry, rv:QueueEntry):Unit = {
-        if( lv==null || rv==null) {
-          return
-        }
-
-        val lts = lv.value.asTombstone
-        val rts = rv.value.asTombstone
-
-        if( lts==null ||  rts==null ) {
-          return
-        }
-
-        if( lv.seq + lts.count  == rv.seq ) {
-
-          lts.count += rts.count
-          rts.count = 0
-
-          if( rv.browsing!=Nil || rv.competing!=Nil ){
-            lv.addBrowsing(rv.browsing)
-            lv.addCompeting(rv.competing)
-            rv.browsing = Nil
-            rv.competing = Nil
-          }
-
-          rv.unlink
-        }
-      }
-
-      // Merge adjacent tombstones
-      merge(this, getNext)
-      merge(getPrevious, this)
-    }
+  def init(qer:QueueEntryRecord):QueueEntry = {
+    this.state = new Flushed(qer.messageKey, qer.size)
     this
   }
 
   def hasSubs = !(competing == Nil && browsing == Nil)
 
+  /**
+   * Dispatches this entry to the consumers and continues dispatching subsequent
+   * entries if it has subscriptions which accept the dispatch.
+   */
   def run() = {
     var next = dispatch()
     while( next!=null ) {
@@ -664,126 +573,151 @@ class QueueEntry(val queue:Queue, val se
     entry
   }
 
-  def size = this.value.size
-  def flush = this.value.flush
-  def load = this.value.load
-  def ref = this.value.ref
-
-  def asTombstone = this.value.asTombstone
-  def asFlushed = this.value.asFlushed
-  def asLoaded = this.value.asLoaded
-  def asTail = this.value.asTail
-  def isFlushedOrFlushing = value.isFlushedOrFlushing
 
-  def dispatch():QueueEntry = value.dispatch
+  def compareTo(o: QueueEntry) = {
+    (seq - o.seq).toInt
+  }
 
-  trait EntryType {
-    def size:Int
-    def dispatch():QueueEntry
-    def ref:Long
+  def toQueueEntryRecord = {
+    val qer = new QueueEntryRecord
+    qer.queueKey = queue.storeId
+    qer.queueSeq = seq
+    qer.messageKey = state.messageKey
+    qer.size = state.size
+    qer
+  }
+
+  override def toString = {
+    "{seq: "+seq+", prefetched: "+prefetched+", value: "+state+", competing: "+competing+", browsing: "+browsing+"}"
+  }
+
+  /////////////////////////////////////////////////////
+  //
+  // State delegates..
+  //
+  /////////////////////////////////////////////////////
+
+  // What state is it in?
+  def asTombstone = this.state.asTombstone
+  def asFlushed = this.state.asFlushed
+  def asLoaded = this.state.asLoaded
+  def asTail = this.state.asTail
+
+  // These should not change the current state.
+  def size = this.state.size
+  def messageKey = this.state.messageKey
+  def isFlushedOrFlushing = state.isFlushedOrFlushing
+  def dispatch():QueueEntry = state.dispatch
+
+  // These methods may cause a change in the current state.
+  def flush:QueueEntry = this.state.flush
+  def load:QueueEntry = this.state.load
+  def tombstone:QueueEntry = this.state.tombstone
+
+  trait EntryState {
+
+    final def entry:QueueEntry = QueueEntry.this
 
     def asTail:Tail = null
     def asLoaded:Loaded = null
     def asFlushed:Flushed = null
     def asTombstone:Tombstone = null
 
-    def flush = {}
-    def load = {}
+    def size:Int
+    def dispatch():QueueEntry
+    def messageKey:Long
     def isFlushedOrFlushing = false
-  }
 
-  class Tail extends EntryType {
-    override def asTail:Tail = this
-    def size = 0
-    def ref = -1
+    def load = entry
 
-    def dispatch():QueueEntry = null
-  }
+    def flush = entry
 
-  class Tombstone extends EntryType {
+    def tombstone = {
+      queue.size -= size
 
-    var count = 1L
+      // Update the prefetch counter to reflect that this entry is no longer being prefetched.
+      var cur = entry
+      while( cur!=null && prefetched > 0 ) {
+        if( cur.hasSubs ) {
+          (cur.browsing ::: cur.competing).foreach { sub => if( sub.prefetched(entry) ) { sub.removePrefetch(entry) } }
+        }
+        cur = cur.getPrevious
 
-    def size = 0
-    def ref = -1
+        // Sanity check.. we should always stop before we get to the last entry in the list.
+        assert( cur != null , "illegal prefetch state detected.")
+      }
 
-    override def asTombstone = this
+      // if rv and lv are both adjacent tombstones, then this merges the rv
+      // tombstone into lv, unlinks rv, and returns lv, otherwise it returns
+      // rv.
+      def merge(lv:QueueEntry, rv:QueueEntry):QueueEntry = {
+        if( lv==null || rv==null) {
+          return rv
+        }
 
-    def dispatch():QueueEntry = {
-      val p = nextOrTail
-      p.addBrowsing(browsing)
-      p.addCompeting(competing)
-      browsing = Nil
-      competing = Nil
-      p
-    }
+        val lts = lv.state.asTombstone
+        val rts = rv.state.asTombstone
 
-    override  def toString = { "ts:{ count: "+count+"}" }
+        if( lts==null ||  rts==null ) {
+          return rv
+        }
 
-  }
+        // Sanity check: the the entries are adjacent.. this should
+        // always be the case.
+        if( lv.seq + lts.count  != rv.seq ) {
+          throw new AssertionError("entries are not adjacent.")
+        }
 
-  class Flushed(val ref:Long, val size:Int) extends EntryType {
+        lts.count += rts.count
+        rts.count = 0
 
-    var loading = false
+        if( rv.browsing!=Nil || rv.competing!=Nil ){
+          lv.addBrowsing(rv.browsing)
+          lv.addCompeting(rv.competing)
+          rv.browsing = Nil
+          rv.competing = Nil
+        }
+        rv.unlink
+        return lv
+      }
 
-    override def asFlushed = this
+      state = new Tombstone()
+      merge(entry, getNext)
+      merge(getPrevious, entry)
+    }
 
-    override def isFlushedOrFlushing = true
+  }
 
-    override def toString = { "flushed:{ loading: "+loading+", size:"+size+"}" }
+  /**
+   * This state is used on the last entry of the queue.  It still has not been initialized
+   * with a message, but it may be holding subscriptions.  This state transitions to Loaded
+   * once a message is received.
+   */
+  class Tail extends EntryState {
 
-    // Flushed entries can't be dispatched until
-    // they get loaded.
-    def dispatch():QueueEntry = {
-      if( !loading ) {
-        var remaining = queue.tune_subscription_buffer - size
-        load
+    override def asTail:Tail = this
+    def size = 0
+    def messageKey = -1
+    def dispatch():QueueEntry = null
 
-        // make sure the next few entries are loaded too..
-        var cur = getNext
-        while( remaining>0 && cur!=null ) {
-          remaining -= cur.size
-          val flushed = cur.asFlushed
-          if( flushed!=null && !flushed.loading) {
-            flushed.load
-          }
-          cur = getNext
-        }
+    override  def toString = { "tail" }
 
-      }
-      null
-    }
+    override def load = throw new AssertionError("Tail entry cannot be loaded")
+    override def flush = throw new AssertionError("Tail entry cannot be flushed")
 
-    override def load() = {
-      if( !loading ) {
-        // start loading it back...
-        loading = true
-        queue.loadingSize += size
-        queue.host.store.loadMessage(ref) { delivery =>
-          // pass off to a source so it can aggregate multiple
-          // loads to reduce cross thread synchronization
-          if( delivery.isDefined ) {
-            queue.store_load_source.merge((QueueEntry.this, delivery.get))
-          } else {
-            // Looks like someone else removed the message from the store.. lets just
-            // tombstone this entry now.
-            queue.dispatchQueue {
-              debug("Detected store drop of message key: %d", ref)
-              tombstone
-            }
-          }
-        }
-      }
-    }
   }
 
-  class Loaded(val delivery: Delivery) extends EntryType {
+  /**
+   * This state is used while a message is loaded in memory.  A message must be in this state
+   * before it can be dispatched to a consumer.  It can transition to Flushed or Tombstone.
+   */
+  class Loaded(val delivery: Delivery) extends EntryState {
 
     var aquired = false
-    def ref = delivery.storeKey
+    def messageKey = delivery.storeKey
     def size = delivery.size
     var flushing = false
-    
+
     override def toString = { "loaded:{ flushing: "+flushing+", aquired: "+aquired+", size:"+size+"}" }
 
     override def isFlushedOrFlushing = {
@@ -796,27 +730,51 @@ class QueueEntry(val queue:Queue, val se
       if( delivery.storeKey == -1 ) {
         val tx = queue.host.store.createStoreBatch
         delivery.storeKey = tx.store(delivery.createMessageRecord)
-        tx.enqueue(createQueueEntryRecord)
+        tx.enqueue(toQueueEntryRecord)
         tx.release
       }
     }
 
-    override def flush():Unit = {
+    override def flush() = {
       if( queue.host.store!=null && !flushing ) {
         flushing=true
         queue.flushingSize+=size
-
         if( delivery.storeBatch!=null ) {
           delivery.storeBatch.eagerFlush(^{
-            queue.store_flush_source.merge(QueueEntry.this)
+            queue.store_flush_source.merge(this)
           })
         } else {
           store
-          queue.host.store.flushMessage(ref) {
-            queue.store_flush_source.merge(QueueEntry.this)
+          queue.host.store.flushMessage(messageKey) {
+            queue.store_flush_source.merge(this)
           }
         }
       }
+      entry
+    }
+
+    def flushed() = {
+      if( flushing ) {
+        queue.flushingSize-=size
+        queue.size -= size
+        state = new Flushed(delivery.storeKey, size)
+      }
+    }
+
+    override def load() = {
+      if( flushing ) {
+        flushing = false
+        queue.flushingSize-=size
+      }
+      entry
+    }
+
+    override def tombstone = {
+      if( flushing ) {
+        flushing = false
+        queue.flushingSize-=size
+      }
+      super.tombstone
     }
 
     def dispatch():QueueEntry = {
@@ -858,7 +816,7 @@ class QueueEntry(val queue:Queue, val se
                 competingSlowSubs = competingSlowSubs ::: sub :: Nil
 
                 if( !sub.full ) {
-                  val node = sub.add(QueueEntry.this)
+                  val node = sub.add(entry)
                   val offering = delivery.copy
                   offering.ack = (tx)=> {
                     queue.ack_source.merge((node, tx))
@@ -909,6 +867,125 @@ class QueueEntry(val queue:Queue, val se
     }
   }
 
+
+  /**
+   * Entries which have been deleted get put into the Tombstone state.  Adjacent entries in the
+   * Tombstone state get merged into a single entry.
+   */
+  class Tombstone extends EntryState {
+
+    /** The number of adjacent entries this Tombstone represents. */
+    var count = 1L
+
+    def size = 0
+    def messageKey = -1
+
+    override def asTombstone = this
+
+    /**
+     * Nothing ot dispatch in a Tombstone, move the subscriptions to the next entry.
+     */
+    def dispatch():QueueEntry = {
+      val p = nextOrTail
+      p.addBrowsing(browsing)
+      p.addCompeting(competing)
+      browsing = Nil
+      competing = Nil
+      p
+    }
+
+    override def tombstone = throw new AssertionError("Tombstone entry cannot be tombstoned")
+    override  def toString = { "tombstone:{ count: "+count+"}" }
+
+  }
+
+  /**
+   * Entries in the Flushed state are not holding the referenced messages in memory anymore.
+   * This state can transition to Loaded or Tombstone.
+   *
+   * TODO: Add a new FlushedList state which can be used to merge multiple
+   * adjacent Flushed entries into a single FlushedList state.  This would allow us
+   * to support queues with millions of flushed entries without much memory impact.
+   */
+  class Flushed(val messageKey:Long, val size:Int) extends EntryState {
+
+    var loading = false
+
+    override def asFlushed = this
+
+    override def isFlushedOrFlushing = true
+
+    override def toString = { "flushed:{ loading: "+loading+", size:"+size+"}" }
+
+    // Flushed entries can't be dispatched until
+    // they get loaded.
+    def dispatch():QueueEntry = {
+      if( !loading ) {
+        var remaining = queue.tune_subscription_buffer - size
+        load
+
+        // make sure the next few entries are loaded too..
+        var cur = getNext
+        while( remaining>0 && cur!=null ) {
+          remaining -= cur.size
+          val flushed = cur.asFlushed
+          if( flushed!=null && !flushed.loading) {
+            flushed.load
+          }
+          cur = getNext
+        }
+
+      }
+      null
+    }
+
+    override def load() = {
+      if( !loading ) {
+        // start loading it back...
+        loading = true
+        queue.loadingSize += size
+        queue.host.store.loadMessage(messageKey) { delivery =>
+          // pass off to a source so it can aggregate multiple
+          // loads to reduce cross thread synchronization
+          if( delivery.isDefined ) {
+            queue.store_load_source.merge((this, delivery.get))
+          } else {
+            // Looks like someone else removed the message from the store.. lets just
+            // tombstone this entry now.
+            queue.dispatchQueue {
+              debug("Detected store drop of message key: %d", messageKey)
+              tombstone
+            }
+          }
+        }
+      }
+      entry
+    }
+
+    def loaded(messageRecord:MessageRecord) = {
+      if( loading ) {
+        loading = false
+        queue.loadingSize -= size
+
+        val delivery = new Delivery()
+        delivery.message = ProtocolFactory.get(messageRecord.protocol).decode(messageRecord.value)
+        delivery.size = messageRecord.size
+        delivery.storeKey = messageRecord.key
+
+        queue.size += size
+        state = new Loaded(delivery)
+      }
+    }
+
+
+    override def tombstone = {
+      if( loading ) {
+        loading = false
+        queue.loadingSize -= size
+      }
+      super.tombstone
+    }
+  }
 }
 
 
@@ -971,7 +1048,6 @@ class Subscription(queue:Queue) extends 
   def removePrefetch(value:QueueEntry):Unit = {
     value.prefetched -= 1
     prefetchSize -= value.size
-
     fillPrefetch()
   }