You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/06/07 07:51:40 UTC

qpid-broker-j git commit: QPID-7812: [AMQP1.0] Wire up the consumer stats unacknowledged messages/count and session consumer count

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 57e647cf9 -> f5c42de05


QPID-7812: [AMQP1.0] Wire up the consumer stats unacknowledged messages/count and session consumer count


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/f5c42de0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/f5c42de0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/f5c42de0

Branch: refs/heads/master
Commit: f5c42de05080e4129d0bcbf3ea06ff5bee892924
Parents: 57e647c
Author: Keith Wall <kw...@apache.org>
Authored: Mon Jun 5 15:56:33 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Wed Jun 7 08:44:21 2017 +0100

----------------------------------------------------------------------
 .../protocol/v1_0/ConsumerTarget_1_0.java       | 59 +++++++++++++++++---
 .../qpid/server/protocol/v1_0/Session_1_0.java  |  3 +-
 2 files changed, 51 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f5c42de0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index e806874..beb1a78 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,11 +65,14 @@ import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
 
 class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerTarget_1_0.class);
     private final boolean _acquires;
+    private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
+    private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
 
     private long _deliveryTag = 0L;
 
@@ -76,7 +80,25 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
     private final AMQPDescribedTypeRegistry _typeRegistry;
     private SendingLinkEndpoint _linkEndpoint;
     private final SectionEncoder _sectionEncoder;
-    private boolean _queueEmpty;
+
+    private final StateChangeListener<MessageInstance, MessageInstance.EntryState> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, MessageInstance.EntryState>()
+    {
+        @Override
+        public void stateChanged(MessageInstance entry, MessageInstance.EntryState oldState, MessageInstance.EntryState newState)
+        {
+            if (isConsumerAcquiredStateForThis(oldState) && !isConsumerAcquiredStateForThis(newState))
+            {
+                removeUnacknowledgedMessage(entry);
+                entry.removeStateChangeListener(this);
+            }
+        }
+
+        private boolean isConsumerAcquiredStateForThis(MessageInstance.EntryState state)
+        {
+            return state instanceof MessageInstance.ConsumerAcquiredState
+                   && ((MessageInstance.ConsumerAcquiredState) state).getConsumer().getTarget() == ConsumerTarget_1_0.this;
+        }
+    };
 
     public ConsumerTarget_1_0(final SendingLinkEndpoint linkEndpoint, boolean acquires)
     {
@@ -212,9 +234,16 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
                 }
                 else
                 {
-                    UnsettledAction action = _acquires
-                            ? new DispositionAction(tag, entry, consumer)
-                            : new DoNothingAction();
+                    final UnsettledAction action;
+                    if (_acquires)
+                    {
+                        action = new DispositionAction(tag, entry, consumer);
+                        addUnacknowledgedMessage(entry);
+                    }
+                    else
+                    {
+                        action = new DoNothingAction();
+                    }
 
                     _linkEndpoint.addUnsettled(tag, action, entry);
                 }
@@ -233,11 +262,12 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
                     {
                         txn.addPostTransactionAction(new ServerTransaction.Action()
                         {
-
+                            @Override
                             public void postCommit()
                             {
                             }
 
+                            @Override
                             public void onRollback()
                             {
                                 entry.release(consumer);
@@ -591,6 +621,19 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
         }
     }
 
+    private void addUnacknowledgedMessage(MessageInstance entry)
+    {
+        _unacknowledgedCount.incrementAndGet();
+        _unacknowledgedBytes.addAndGet(entry.getMessage().getSizeIncludingHeader());
+        entry.addStateChangeListener(_unacknowledgedMessageListener);
+    }
+
+    private void removeUnacknowledgedMessage(MessageInstance entry)
+    {
+        _unacknowledgedBytes.addAndGet(-entry.getMessage().getSizeIncludingHeader());
+        _unacknowledgedCount.decrementAndGet();
+    }
+
     private class DoNothingAction implements UnsettledAction
     {
         public DoNothingAction()
@@ -617,15 +660,13 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
     @Override
     public long getUnacknowledgedBytes()
     {
-        // TODO
-        return 0;
+        return _unacknowledgedBytes.get();
     }
 
     @Override
     public long getUnacknowledgedMessages()
     {
-        // TODO
-        return 0;
+        return _unacknowledgedCount.get();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f5c42de0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 07a228e..f75df81 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -1431,8 +1431,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
     @Override
     public long getConsumerCount()
     {
-        // TODO - fix statistic - need to count consumers
-        return -1;
+        return _consumers.size();
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org