You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/24 19:14:20 UTC
svn commit: r1304877 - in
/qpid/branches/java-config-and-management/qpid/java:
broker-plugins/management/src/main/java/resources/
broker-plugins/management/src/main/java/resources/css/
broker-plugins/management/src/main/java/resources/js/ broker/src/ma...
Author: rgodfrey
Date: Sat Mar 24 18:14:19 2012
New Revision: 1304877
URL: http://svn.apache.org/viewvc?rev=1304877&view=rev
Log:
NO-JIRA: [Java Config] More updtes to queue management
Modified:
qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/css/queue.css
qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/js/queue.js
qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/queue.html
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
Modified: qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/css/queue.css
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/css/queue.css?rev=1304877&r1=1304876&r2=1304877&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/css/queue.css (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/css/queue.css Sat Mar 24 18:14:19 2012
@@ -1,10 +1,10 @@
#bindings {
- width: 560px;
+ width: 100%;
height: 100px;
}
#consumers {
- width: 560px;
+ width: 100%;
height: 100px;
}
Modified: qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/js/queue.js
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/js/queue.js?rev=1304877&r1=1304876&r2=1304877&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/js/queue.js (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/js/queue.js Sat Mar 24 18:14:19 2012
@@ -29,6 +29,7 @@ require(["dojo/store/JsonRest",
thisObj.grid = new DataGrid({
store: thisObj.dataStore,
structure: structure,
+ autoHeight: true
}, divName);
// since we created this grid programmatically, call startup to render it
@@ -52,11 +53,14 @@ require(["dojo/store/JsonRest",
// iterate over existing store... if not in new data then remove
store.query({ }).forEach(function(object)
{
- for(var i=0; i < data.length; i++)
+ if(data)
{
- if(data[i].id == object.id)
+ for(var i=0; i < data.length; i++)
{
- return;
+ if(data[i].id == object.id)
+ {
+ return;
+ }
}
}
store.remove(object.id);
@@ -64,30 +68,33 @@ require(["dojo/store/JsonRest",
});
// iterate over data...
- for(var i=0; i < data.length; i++)
+ if(data)
{
- if(item = store.get(data[i].id))
+ for(var i=0; i < data.length; i++)
{
- var modified;
- for(var propName in data[i])
+ if(item = store.get(data[i].id))
{
- if(item[ propName ] != data[i][ propName ])
+ var modified;
+ for(var propName in data[i])
{
- item[ propName ] = data[i][ propName ];
- modified = true;
+ if(item[ propName ] != data[i][ propName ])
+ {
+ item[ propName ] = data[i][ propName ];
+ modified = true;
+ }
+ }
+ if(modified)
+ {
+ // ... check attributes for updates
+ store.notify(item, data[i].id);
}
}
- if(modified)
+ else
{
- // ... check attributes for updates
- store.notify(item, data[i].id);
+ // ,,, if not in the store then add
+ store.put(data[i]);
}
}
- else
- {
- // ,,, if not in the store then add
- store.put(data[i]);
- }
}
};
@@ -126,6 +133,56 @@ require(["dojo/store/JsonRest",
}
+ function formatTime(amount)
+ {
+ this.units = "ms";
+ this.value = 0;
+
+ if(amount < 1000)
+ {
+ this.units = "ms";
+ this.value = amount;
+ }
+ else if(amount < 1000 * 60)
+ {
+ this.units = "s";
+ this.value = amount / 1000
+ this.value = this.value.toPrecision(3);
+ }
+ else if(amount < 1000 * 60 * 60)
+ {
+ this.units = "min";
+ this.value = amount / (1000 * 60)
+ this.value = this.value.toPrecision(3);
+ }
+ else if(amount < 1000 * 60 * 60 * 24)
+ {
+ this.units = "hr";
+ this.value = amount / (1000 * 60 * 60)
+ this.value = this.value.toPrecision(3);
+ }
+ else if(amount < 1000 * 60 * 60 * 24 * 7)
+ {
+ this.units = "d";
+ this.value = amount / (1000 * 60 * 60 * 24)
+ this.value = this.value.toPrecision(3);
+ }
+ else if(amount < 1000 * 60 * 60 * 24 * 365)
+ {
+ this.units = "wk";
+ this.value = amount / (1000 * 60 * 60 * 24 * 7)
+ this.value = this.value.toPrecision(3);
+ }
+ else
+ {
+ this.units = "yr";
+ this.value = amount / (1000 * 60 * 60 * 24 * 365)
+ this.value = this.value.toPrecision(3);
+ }
+
+ }
+
+
function QueueUpdater()
{
this.name = dom.byId("name");
@@ -160,8 +217,12 @@ require(["dojo/store/JsonRest",
]);
thisObj.consumersGrid = new UpdatableStore(thisObj.queueData.consumers, "consumers",
- [ { name: "Name", field: "name", width: "120px"},
- { name: "Mode", field: "distributionMode", width: "120px"}
+ [ { name: "Name", field: "name", width: "70px"},
+ { name: "Mode", field: "distributionMode", width: "70px"},
+ { name: "Msgs Rate", field: "msgRate",
+ width: "150px"},
+ { name: "Bytes Rate", field: "bytesRate",
+ width: "150px"}
]);
@@ -209,13 +270,53 @@ require(["dojo/store/JsonRest",
for(var i=0; i < bindings.length; i++)
{
bindings[i].arguments = dojo.toJson(bindings[i].arguments);
+ bindings[i].argumentString = dojo.toJson(bindings[i].arguments);
}
+
+ var consumers = thisObj.queueData[ "consumers" ];
+ if(consumers)
+ {
+ for(var i=0; i < consumers.length; i++)
+ {
+ var stats = consumers[i][ "statistics" ];
+
+ // flatten statistics into attributes
+ for(var propName in stats)
+ {
+ consumers[i][ propName ] = stats[ propName ];
+ }
+ }
+ }
thisObj.updateHeader();
- queueData = data[0];
- stats = queueData[ "statistics" ];
+
+ // update alerting info
+ alertRepeatGap = new formatTime( thisObj.queueData["alertRepeatGap"] );
+
+ dom.byId("alertRepeatGap").innerHTML = alertRepeatGap.value;
+ dom.byId("alertRepeatGapUnits").innerHTML = alertRepeatGap.units;
+
+
+ alertMsgAge = new formatTime( thisObj.queueData["alertThresholdMessageAge"] );
+
+ dom.byId("alertThresholdMessageAge").innerHTML = alertMsgAge.value;
+ dom.byId("alertThresholdMessageAgeUnits").innerHTML = alertMsgAge.units;
+
+ alertMsgSize = new formatBytes( thisObj.queueData["alertThresholdMessageSize"] );
+
+ dom.byId("alertThresholdMessageSize").innerHTML = alertMsgSize.value;
+ dom.byId("alertThresholdMessageSizeUnits").innerHTML = alertMsgSize.units;
+
+ alertQueueDepth = new formatBytes( thisObj.queueData["alertThresholdQueueDepthBytes"] );
+
+ dom.byId("alertThresholdQueueDepthBytes").innerHTML = alertQueueDepth.value;
+ dom.byId("alertThresholdQueueDepthBytesUnits").innerHTML = alertQueueDepth.units;
+
+ dom.byId("alertThresholdQueueDepthMessages").innerHTML = thisObj.queueData["alertThresholdQueueDepthMessages"];
+
+ stats = thisObj.queueData[ "statistics" ];
var sampleTime = new Date();
var messageIn = stats["totalEnqueuedMessages"];
@@ -242,6 +343,32 @@ require(["dojo/store/JsonRest",
dom.byId("bytesOutRate").innerHTML = "(" + bytesOutFormat.value;
dom.byId("bytesOutRateUnits").innerHTML = bytesOutFormat.units + "/s)"
+ if(consumers && thisObj.consumers)
+ {
+ for(var i=0; i < consumers.length; i++)
+ {
+ var consumer = consumers[i];
+ for(var j = 0; j < thisObj.consumers.length; j++)
+ {
+ var oldConsumer = thisObj.consumers[j];
+ if(oldConsumer.id == consumer.id)
+ {
+ var msgRate = (1000 * (consumer.messagesOut - oldConsumer.messagesOut)) /
+ samplePeriod;
+ consumer.msgRate = msgRate.toFixed(0) + "msg/s";
+
+ var bytesRate = (1000 * (consumer.bytesOut - oldConsumer.bytesOut)) /
+ samplePeriod
+ var bytesRateFormat = new formatBytes( bytesRate );
+ consumer.bytesRate = bytesRateFormat.value + bytesRateFormat.units + "/s";
+ }
+
+
+ }
+
+ }
+ }
+
}
thisObj.sampleTime = sampleTime;
@@ -249,8 +376,12 @@ require(["dojo/store/JsonRest",
thisObj.bytesIn = bytesIn;
thisObj.messageOut = messageOut;
thisObj.bytesOut = bytesOut;
+ thisObj.consumers = consumers;
+ // update bindings
thisObj.bindingsGrid.update(thisObj.queueData.bindings)
+
+ // update consumers
thisObj.consumersGrid.update(thisObj.queueData.consumers)
Modified: qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/queue.html
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/queue.html?rev=1304877&r1=1304876&r2=1304877&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/queue.html (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/queue.html Sat Mar 24 18:14:19 2012
@@ -43,14 +43,14 @@
<span id="queueDepthMessages" style="position:absolute; right:9.5em"></span>
<span style="position:absolute; right: 5em; width: 4em"> msgs</span>
<span id="queueDepthBytes" style="position:absolute; right: 3.3em">(</span>
- <span id="queueDepthBytesUnits"style="position:absolute; right: 0em; width: 3em">)</span>
+ <span id="queueDepthBytesUnits" style="position:absolute; right: 0em; width: 3em">)</span>
<br/>
<span style="">State:</span><span id="state" style="position:absolute; left:6em"></span>
<span style="position:absolute; left:26em">Pre-fetched:</span>
- <span id="unacknowledgedMessages" style="position:absolute; right:9.5em">500</span>
+ <span id="unacknowledgedMessages" style="position:absolute; right:9.5em"></span>
<span style="position:absolute; right: 5em; width: 4em"> msgs</span>
- <span id="unacknowledgedBytes" style="position:absolute; right: 3.3em">(256</span>
- <span id="unacknowledgedBytesUnits" style="position:absolute; right: 0em; width: 3em">Kb)</span>
+ <span id="unacknowledgedBytes" style="position:absolute; right: 3.3em"></span>
+ <span id="unacknowledgedBytesUnits" style="position:absolute; right: 0em; width: 3em"></span>
<br/>
<span style="">Durable:</span><span id="durable" style="position:absolute; left:6em"></span>
<span style="position:absolute; left:26em">Inbound:</span>
@@ -76,7 +76,33 @@
</div>
<br/>
<div data-dojo-type="dijit.TitlePane" data-dojo-props="title: 'Alerting Thresholds'">
- TODO
+ <span style="">Max. Queue Size:</span>
+ <span id="alertThresholdQueueDepthMessages"
+ style="position:absolute; left:8em; width:8em; text-align:right"></span>
+ <span style="position:absolute; left:16.2em">msgs</span>
+
+ <span id="alertThresholdQueueDepthBytes"
+ style="position:absolute; left:20em; width:8em; text-align:right"></span>
+ <span id="alertThresholdQueueDepthBytesUnits" style="position:absolute; left:28.2em"></span>
+ <br>
+ <span style="">Max. Message Age:</span>
+ <span id="alertThresholdMessageAge"
+ style="position:absolute; left:8em; width:8em; text-align:right"></span>
+ <span id="alertThresholdMessageAgeUnits" style="position:absolute; left:16.2em"></span>
+
+ <span style="position:absolute; left:21em">Size: </span>
+ <span id="alertThresholdMessageSize"
+ style="position:absolute; left:23em; width:5em; text-align:right"></span>
+ <span id="alertThresholdMessageSizeUnits" style="position:absolute; left:28.2em"></span>
+ <br/>
+ <br/>
+ <span style="">Alert frequency:</span>
+ <span id="alertRepeatGap"
+ style="position:absolute; left:8em; width:8em; text-align:right"></span>
+ <span id="alertRepeatGapUnits" style="position:absolute; left:16.2em"></span>
+
+
+
</div>
</div>
<!--<div data-dojo-type="dijit.layout.ContentPane" data-dojo-props="region:'trailing'">Trailing pane</div>-->
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java?rev=1304877&r1=1304876&r2=1304877&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java Sat Mar 24 18:14:19 2012
@@ -33,6 +33,7 @@ public class ConsumerAdapter extends Abs
{
private final Subscription _subscription;
private final QueueAdapter _queue;
+ private final ConsumerStatistics _statistics;
public ConsumerAdapter(final QueueAdapter queueAdapter, final Subscription subscription)
{
@@ -44,6 +45,7 @@ public class ConsumerAdapter extends Abs
_subscription = subscription;
_queue = queueAdapter;
+ _statistics = new ConsumerStatistics();
//TODO
}
@@ -169,6 +171,40 @@ public class ConsumerAdapter extends Abs
public Statistics getStatistics()
{
- return NoStatistics.getInstance();
+ return _statistics;
+ }
+
+ private class ConsumerStatistics implements Statistics
+ {
+
+ public Collection<String> getStatisticNames()
+ {
+ return AVAILABLE_STATISTICS;
+ }
+
+ public Object getStatistic(String name)
+ {
+ if(name.equals(BYTES_OUT))
+ {
+ return _subscription.getBytesOut();
+ }
+ else if(name.equals(MESSAGES_OUT))
+ {
+ return _subscription.getMessagesOut();
+ }
+ else if(name.equals(STATE_CHANGED))
+ {
+
+ }
+ else if(name.equals(UNACKNOWLEDGED_BYTES))
+ {
+ return _subscription.getUnacknowledgedBytes();
+ }
+ else if(name.equals(UNACKNOWLEDGED_MESSAGES))
+ {
+ return _subscription.getUnacknowledgedMessages();
+ }
+ return null; // TODO - Implement
+ }
}
}
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java?rev=1304877&r1=1304876&r2=1304877&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java Sat Mar 24 18:14:19 2012
@@ -72,6 +72,10 @@ class ImplicitAcceptDispositionChangeLis
public boolean acquire()
{
boolean acquired = _entry.acquire(getSubscription());
+ if(acquired)
+ {
+ getSubscription().recordUnacknowledged(_entry);
+ }
return acquired;
}
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=1304877&r1=1304876&r2=1304877&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java Sat Mar 24 18:14:19 2012
@@ -33,6 +33,14 @@ public interface Subscription
boolean isTransient();
+ long getBytesOut();
+
+ long getMessagesOut();
+
+ long getUnacknowledgedBytes();
+
+ long getUnacknowledgedMessages();
+
public static enum State
{
ACTIVE,
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=1304877&r1=1304876&r2=1304877&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Sat Mar 24 18:14:19 2012
@@ -93,8 +93,13 @@ public abstract class SubscriptionImpl i
private LogActor _logActor;
private UUID _id;
private final AtomicLong _deliveredCount = new AtomicLong(0);
- private long _createTime = System.currentTimeMillis();
+ private final AtomicLong _deliveredBytes = new AtomicLong(0);
+
+ private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
+ private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
+ private long _createTime = System.currentTimeMillis();
+
static final class BrowserSubscription extends SubscriptionImpl
{
@@ -277,22 +282,13 @@ public abstract class SubscriptionImpl i
public void send(QueueEntry entry, boolean batch) throws AMQException
{
- // if we do not need to wait for client acknowledgements
- // we can decrement the reference count immediately.
-
- // By doing this _before_ the send we ensure that it
- // doesn't get sent if it can't be dequeued, preventing
- // duplicate delivery on recovery.
-
- // The send may of course still fail, in which case, as
- // the message is unacked, it will be lost.
-
+
synchronized (getChannel())
{
getChannel().getProtocolSession().setDeferFlush(batch);
long deliveryTag = getChannel().getNextDeliveryTag();
-
+ addUnacknowledgedMessage(entry);
recordMessageDelivery(entry, deliveryTag);
sendToClient(entry, deliveryTag);
@@ -693,6 +689,7 @@ public abstract class SubscriptionImpl i
{
_deliveryMethod.deliverToClient(this,entry,deliveryTag);
_deliveredCount.incrementAndGet();
+ _deliveredBytes.addAndGet(entry.getSize());
}
@@ -828,4 +825,44 @@ public abstract class SubscriptionImpl i
_channel.getProtocolSession().flushBatched();
}
+
+ public long getBytesOut()
+ {
+ return _deliveredBytes.longValue();
+ }
+
+ public long getMessagesOut()
+ {
+ return _deliveredCount.longValue();
+ }
+
+
+ protected void addUnacknowledgedMessage(QueueEntry entry)
+ {
+ final long size = entry.getSize();
+ _unacknowledgedBytes.addAndGet(size);
+ _unacknowledgedCount.incrementAndGet();
+ entry.addStateChangeListener(new QueueEntry.StateChangeListener()
+ {
+ public void stateChanged(QueueEntry entry, QueueEntry.State oldState, QueueEntry.State newState)
+ {
+ if(oldState.equals(QueueEntry.State.ACQUIRED) && !newState.equals(QueueEntry.State.ACQUIRED))
+ {
+ _unacknowledgedBytes.addAndGet(-size);
+ _unacknowledgedCount.decrementAndGet();
+ entry.removeStateChangeListener(this);
+ }
+ }
+ });
+ }
+
+ public long getUnacknowledgedBytes()
+ {
+ return _unacknowledgedBytes.longValue();
+ }
+
+ public long getUnacknowledgedMessages()
+ {
+ return _unacknowledgedCount.longValue();
+ }
}
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1304877&r1=1304876&r2=1304877&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Sat Mar 24 18:14:19 2012
@@ -130,6 +130,10 @@ public class Subscription_0_10 implement
private String _trace;
private final long _createTime = System.currentTimeMillis();
private final AtomicLong _deliveredCount = new AtomicLong(0);
+ private final AtomicLong _deliveredBytes = new AtomicLong(0);
+ private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
+ private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
+
private final Map<String, Object> _arguments;
private int _deferredMessageCredit;
private long _deferredSizeCredit;
@@ -631,10 +635,15 @@ public class Subscription_0_10 implement
_session.sendMessage(xfr, _postIdSettingAction);
entry.incrementDeliveryCount();
_deliveredCount.incrementAndGet();
+ _deliveredBytes.addAndGet(entry.getSize());
if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED)
{
forceDequeue(entry, false);
}
+ else if(_acquireMode == MessageAcquireMode.PRE_ACQUIRED)
+ {
+ recordUnacknowledged(entry);
+ }
}
else
{
@@ -643,6 +652,12 @@ public class Subscription_0_10 implement
}
}
+ void recordUnacknowledged(QueueEntry entry)
+ {
+ _unacknowledgedCount.incrementAndGet();
+ _unacknowledgedBytes.addAndGet(entry.getSize());
+ }
+
private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
{
_deferredMessageCredit += deferredMessageCredit;
@@ -934,6 +949,8 @@ public class Subscription_0_10 implement
// TODO Fix Store Context / cleanup
if(entry.isAcquiredBy(this))
{
+ _unacknowledgedBytes.addAndGet(-entry.getSize());
+ _unacknowledgedCount.decrementAndGet();
entry.discard();
}
}
@@ -1085,4 +1102,24 @@ public class Subscription_0_10 implement
{
_session.getConnection().flush();
}
+
+ public long getBytesOut()
+ {
+ return _deliveredBytes.longValue();
+ }
+
+ public long getMessagesOut()
+ {
+ return _deliveredCount.longValue();
+ }
+
+ public long getUnacknowledgedBytes()
+ {
+ return _unacknowledgedBytes.longValue();
+ }
+
+ public long getUnacknowledgedMessages()
+ {
+ return _unacknowledgedCount.longValue();
+ }
}
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=1304877&r1=1304876&r2=1304877&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Sat Mar 24 18:14:19 2012
@@ -122,6 +122,26 @@ public class MockSubscription implements
return false;
}
+ public long getBytesOut()
+ {
+ return 0; // TODO - Implement
+ }
+
+ public long getMessagesOut()
+ {
+ return 0; // TODO - Implement
+ }
+
+ public long getUnacknowledgedBytes()
+ {
+ return 0; // TODO - Implement
+ }
+
+ public long getUnacknowledgedMessages()
+ {
+ return 0; // TODO - Implement
+ }
+
public AMQQueue getQueue()
{
return queue;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org