You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/09/12 08:01:29 UTC

qpid-broker-j git commit: QPID-7825: [Java Broker] Update statistic after dequeue has taken place

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 82495db3a -> 6d7ac3680


QPID-7825: [Java Broker] Update statistic after dequeue has taken place


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/6d7ac368
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/6d7ac368
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/6d7ac368

Branch: refs/heads/master
Commit: 6d7ac3680a2edba92dd99656fb880765a3ecfdcd
Parents: 82495db
Author: Keith Wall <kw...@apache.org>
Authored: Tue Sep 12 08:58:03 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Tue Sep 12 09:00:45 2017 +0100

----------------------------------------------------------------------
 .../apache/qpid/server/queue/AbstractQueue.java | 34 ++++++++++++--------
 1 file changed, 21 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6d7ac368/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 416dbfd..79e6032 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -92,7 +92,6 @@ import org.apache.qpid.server.message.MessageInfoImpl;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageSender;
-import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.RejectType;
 import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
@@ -1770,19 +1769,29 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
     @Override
     public void deleteEntry(final QueueEntry entry)
     {
-        boolean acquiredForDequeueing = entry.acquireOrSteal(new Runnable()
-        {
-            @Override
-            public void run()
-            {
-                dequeueEntry(entry);
-            }
-        });
+        deleteEntry(entry, null);
+    }
 
-        if(acquiredForDequeueing)
+    private void deleteEntry(final QueueEntry entry, final Runnable postDequeueTask)
+    {
+        boolean acquiredForDequeueing = entry.acquireOrSteal(() ->
+                                                             {
+                                                                 _logger.debug("Dequeuing stolen node {}", entry);
+                                                                 dequeueEntry(entry);
+                                                                 if (postDequeueTask != null)
+                                                                 {
+                                                                     postDequeueTask.run();
+                                                                 }
+                                                             });
+
+        if (acquiredForDequeueing)
         {
             _logger.debug("Dequeuing node {}", entry);
             dequeueEntry(entry);
+            if (postDequeueTask != null)
+            {
+                postDequeueTask.run();
+            }
         }
     }
 
@@ -2128,8 +2137,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
                     expired = false;
                     if (node.acquire())
                     {
-                        _queueStatistics.addToExpired(node.getSizeWithHeader());
                         dequeueEntry(node);
+                        _queueStatistics.addToExpired(node.getSizeWithHeader());
                     }
                 }
 
@@ -2199,8 +2208,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
                 // If the node has expired then acquire it
                 if (node.expired())
                 {
-                    deleteEntry(node);
-                    _queueStatistics.addToExpired(node.getSizeWithHeader());
+                    deleteEntry(node, () -> _queueStatistics.addToExpired(node.getSizeWithHeader()));
                 }
                 else
                 {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org