You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/06/07 07:51:40 UTC
qpid-broker-j git commit: QPID-7812: [AMQP1.0] Wire up the consumer
stats unacknowledged messages/count and session consumer count
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 57e647cf9 -> f5c42de05
QPID-7812: [AMQP1.0] Wire up the consumer stats unacknowledged messages/count and session consumer count
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/f5c42de0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/f5c42de0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/f5c42de0
Branch: refs/heads/master
Commit: f5c42de05080e4129d0bcbf3ea06ff5bee892924
Parents: 57e647c
Author: Keith Wall <kw...@apache.org>
Authored: Mon Jun 5 15:56:33 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Wed Jun 7 08:44:21 2017 +0100
----------------------------------------------------------------------
.../protocol/v1_0/ConsumerTarget_1_0.java | 59 +++++++++++++++++---
.../qpid/server/protocol/v1_0/Session_1_0.java | 3 +-
2 files changed, 51 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f5c42de0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index e806874..beb1a78 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,11 +65,14 @@ import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
{
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerTarget_1_0.class);
private final boolean _acquires;
+ private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
+ private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
private long _deliveryTag = 0L;
@@ -76,7 +80,25 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
private final AMQPDescribedTypeRegistry _typeRegistry;
private SendingLinkEndpoint _linkEndpoint;
private final SectionEncoder _sectionEncoder;
- private boolean _queueEmpty;
+
+ private final StateChangeListener<MessageInstance, MessageInstance.EntryState> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, MessageInstance.EntryState>()
+ {
+ @Override
+ public void stateChanged(MessageInstance entry, MessageInstance.EntryState oldState, MessageInstance.EntryState newState)
+ {
+ if (isConsumerAcquiredStateForThis(oldState) && !isConsumerAcquiredStateForThis(newState))
+ {
+ removeUnacknowledgedMessage(entry);
+ entry.removeStateChangeListener(this);
+ }
+ }
+
+ private boolean isConsumerAcquiredStateForThis(MessageInstance.EntryState state)
+ {
+ return state instanceof MessageInstance.ConsumerAcquiredState
+ && ((MessageInstance.ConsumerAcquiredState) state).getConsumer().getTarget() == ConsumerTarget_1_0.this;
+ }
+ };
public ConsumerTarget_1_0(final SendingLinkEndpoint linkEndpoint, boolean acquires)
{
@@ -212,9 +234,16 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
}
else
{
- UnsettledAction action = _acquires
- ? new DispositionAction(tag, entry, consumer)
- : new DoNothingAction();
+ final UnsettledAction action;
+ if (_acquires)
+ {
+ action = new DispositionAction(tag, entry, consumer);
+ addUnacknowledgedMessage(entry);
+ }
+ else
+ {
+ action = new DoNothingAction();
+ }
_linkEndpoint.addUnsettled(tag, action, entry);
}
@@ -233,11 +262,12 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
{
txn.addPostTransactionAction(new ServerTransaction.Action()
{
-
+ @Override
public void postCommit()
{
}
+ @Override
public void onRollback()
{
entry.release(consumer);
@@ -591,6 +621,19 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
}
}
+ private void addUnacknowledgedMessage(MessageInstance entry)
+ {
+ _unacknowledgedCount.incrementAndGet();
+ _unacknowledgedBytes.addAndGet(entry.getMessage().getSizeIncludingHeader());
+ entry.addStateChangeListener(_unacknowledgedMessageListener);
+ }
+
+ private void removeUnacknowledgedMessage(MessageInstance entry)
+ {
+ _unacknowledgedBytes.addAndGet(-entry.getMessage().getSizeIncludingHeader());
+ _unacknowledgedCount.decrementAndGet();
+ }
+
private class DoNothingAction implements UnsettledAction
{
public DoNothingAction()
@@ -617,15 +660,13 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
@Override
public long getUnacknowledgedBytes()
{
- // TODO
- return 0;
+ return _unacknowledgedBytes.get();
}
@Override
public long getUnacknowledgedMessages()
{
- // TODO
- return 0;
+ return _unacknowledgedCount.get();
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f5c42de0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 07a228e..f75df81 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -1431,8 +1431,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
@Override
public long getConsumerCount()
{
- // TODO - fix statistic - need to count consumers
- return -1;
+ return _consumers.size();
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org