You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/24 19:14:20 UTC

svn commit: r1304877 - in /qpid/branches/java-config-and-management/qpid/java: broker-plugins/management/src/main/java/resources/ broker-plugins/management/src/main/java/resources/css/ broker-plugins/management/src/main/java/resources/js/ broker/src/ma...

Author: rgodfrey
Date: Sat Mar 24 18:14:19 2012
New Revision: 1304877

URL: http://svn.apache.org/viewvc?rev=1304877&view=rev
Log:
NO-JIRA: [Java Config] More updtes to queue management

Modified:
    qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/css/queue.css
    qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/js/queue.js
    qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/queue.html
    qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
    qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java
    qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
    qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
    qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
    qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java

Modified: qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/css/queue.css
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/css/queue.css?rev=1304877&r1=1304876&r2=1304877&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/css/queue.css (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/css/queue.css Sat Mar 24 18:14:19 2012
@@ -1,10 +1,10 @@
 
 #bindings {
-	width: 560px;
+	width: 100%;
 	height: 100px;
 }
 
 #consumers {
-	width: 560px;
+	width: 100%;
 	height: 100px;
 }

Modified: qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/js/queue.js
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/js/queue.js?rev=1304877&r1=1304876&r2=1304877&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/js/queue.js (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/js/queue.js Sat Mar 24 18:14:19 2012
@@ -29,6 +29,7 @@ require(["dojo/store/JsonRest",
                  thisObj.grid = new DataGrid({
                              store: thisObj.dataStore,
                              structure: structure,
+                             autoHeight: true
                                     }, divName);
 
                  // since we created this grid programmatically, call startup to render it
@@ -52,11 +53,14 @@ require(["dojo/store/JsonRest",
                  // iterate over existing store... if not in new data then remove
                  store.query({ }).forEach(function(object)
                      {
-                         for(var i=0; i < data.length; i++)
+                         if(data)
                          {
-                             if(data[i].id == object.id)
+                             for(var i=0; i < data.length; i++)
                              {
-                                 return;
+                                 if(data[i].id == object.id)
+                                 {
+                                     return;
+                                 }
                              }
                          }
                          store.remove(object.id);
@@ -64,30 +68,33 @@ require(["dojo/store/JsonRest",
                      });
 
                  // iterate over data...
-                 for(var i=0; i < data.length; i++)
+                 if(data)
                  {
-                     if(item = store.get(data[i].id))
+                     for(var i=0; i < data.length; i++)
                      {
-                         var modified;
-                         for(var propName in data[i])
+                         if(item = store.get(data[i].id))
                          {
-                             if(item[ propName ] != data[i][ propName ])
+                             var modified;
+                             for(var propName in data[i])
                              {
-                                 item[ propName ] = data[i][ propName ];
-                                 modified = true;
+                                 if(item[ propName ] != data[i][ propName ])
+                                 {
+                                     item[ propName ] = data[i][ propName ];
+                                     modified = true;
+                                 }
+                             }
+                             if(modified)
+                             {
+                                 // ... check attributes for updates
+                                 store.notify(item, data[i].id);
                              }
                          }
-                         if(modified)
+                         else
                          {
-                             // ... check attributes for updates
-                             store.notify(item, data[i].id);
+                             // ,,, if not in the store then add
+                             store.put(data[i]);
                          }
                      }
-                     else
-                     {
-                         // ,,, if not in the store then add
-                         store.put(data[i]);
-                     }
                  }
 
             };
@@ -126,6 +133,56 @@ require(["dojo/store/JsonRest",
          }
 
 
+         function formatTime(amount)
+         {
+            this.units = "ms";
+            this.value = 0;
+
+            if(amount < 1000)
+            {
+                this.units = "ms";
+                this.value = amount;
+            }
+            else if(amount < 1000 * 60)
+            {
+                this.units = "s";
+                this.value = amount / 1000
+                this.value = this.value.toPrecision(3);
+            }
+            else if(amount < 1000 * 60 * 60)
+            {
+                this.units = "min";
+                this.value = amount / (1000 * 60)
+                this.value = this.value.toPrecision(3);
+            }
+            else if(amount < 1000 * 60 * 60 * 24)
+            {
+                this.units = "hr";
+                this.value = amount / (1000 * 60 * 60)
+                this.value = this.value.toPrecision(3);
+            }
+            else if(amount < 1000 * 60 * 60 * 24 * 7)
+            {
+                this.units = "d";
+                this.value = amount / (1000 * 60 * 60 * 24)
+                this.value = this.value.toPrecision(3);
+            }
+            else if(amount < 1000 * 60 * 60 * 24 * 365)
+            {
+                this.units = "wk";
+                this.value = amount / (1000 * 60 * 60 * 24 * 7)
+                this.value = this.value.toPrecision(3);
+            }
+            else
+            {
+                this.units = "yr";
+                this.value = amount / (1000 * 60 * 60 * 24 * 365)
+                this.value = this.value.toPrecision(3);
+            }
+
+         }
+
+
          function QueueUpdater()
          {
             this.name = dom.byId("name");
@@ -160,8 +217,12 @@ require(["dojo/store/JsonRest",
                                                          ]);
 
                                 thisObj.consumersGrid = new UpdatableStore(thisObj.queueData.consumers, "consumers",
-                                                         [ { name: "Name",    field: "name",      width: "120px"},
-                                                           { name: "Mode", field: "distributionMode",          width: "120px"}
+                                                         [ { name: "Name",    field: "name",      width: "70px"},
+                                                           { name: "Mode", field: "distributionMode", width: "70px"},
+                                                           { name: "Msgs Rate", field: "msgRate",
+                                                           width: "150px"},
+                                                           { name: "Bytes Rate", field: "bytesRate",
+                                                              width: "150px"}
                                                          ]);
 
 
@@ -209,13 +270,53 @@ require(["dojo/store/JsonRest",
                     for(var i=0; i < bindings.length; i++)
                     {
                         bindings[i].arguments = dojo.toJson(bindings[i].arguments);
+                        bindings[i].argumentString = dojo.toJson(bindings[i].arguments);
 
                     }
 
+
+                    var consumers = thisObj.queueData[ "consumers" ];
+                    if(consumers)
+                    {
+                        for(var i=0; i < consumers.length; i++)
+                        {
+                            var stats = consumers[i][ "statistics" ];
+
+                            // flatten statistics into attributes
+                            for(var propName in stats)
+                            {
+                                consumers[i][ propName ] = stats[ propName ];
+                            }
+                        }
+                    }
                     thisObj.updateHeader();
 
-                    queueData = data[0];
-                    stats = queueData[ "statistics" ];
+
+                    // update alerting info
+                    alertRepeatGap = new formatTime( thisObj.queueData["alertRepeatGap"] );
+
+                    dom.byId("alertRepeatGap").innerHTML = alertRepeatGap.value;
+                    dom.byId("alertRepeatGapUnits").innerHTML = alertRepeatGap.units;
+
+
+                    alertMsgAge = new formatTime( thisObj.queueData["alertThresholdMessageAge"] );
+
+                    dom.byId("alertThresholdMessageAge").innerHTML = alertMsgAge.value;
+                    dom.byId("alertThresholdMessageAgeUnits").innerHTML = alertMsgAge.units;
+
+                    alertMsgSize = new formatBytes( thisObj.queueData["alertThresholdMessageSize"] );
+
+                    dom.byId("alertThresholdMessageSize").innerHTML = alertMsgSize.value;
+                    dom.byId("alertThresholdMessageSizeUnits").innerHTML = alertMsgSize.units;
+
+                    alertQueueDepth = new formatBytes( thisObj.queueData["alertThresholdQueueDepthBytes"] );
+
+                    dom.byId("alertThresholdQueueDepthBytes").innerHTML = alertQueueDepth.value;
+                    dom.byId("alertThresholdQueueDepthBytesUnits").innerHTML = alertQueueDepth.units;
+
+                    dom.byId("alertThresholdQueueDepthMessages").innerHTML = thisObj.queueData["alertThresholdQueueDepthMessages"];
+
+                    stats = thisObj.queueData[ "statistics" ];
 
                     var sampleTime = new Date();
                     var messageIn = stats["totalEnqueuedMessages"];
@@ -242,6 +343,32 @@ require(["dojo/store/JsonRest",
                         dom.byId("bytesOutRate").innerHTML = "(" + bytesOutFormat.value;
                         dom.byId("bytesOutRateUnits").innerHTML = bytesOutFormat.units + "/s)"
 
+                        if(consumers && thisObj.consumers)
+                        {
+                            for(var i=0; i < consumers.length; i++)
+                            {
+                                var consumer = consumers[i];
+                                for(var j = 0; j < thisObj.consumers.length; j++)
+                                {
+                                    var oldConsumer = thisObj.consumers[j];
+                                    if(oldConsumer.id == consumer.id)
+                                    {
+                                        var msgRate = (1000 * (consumer.messagesOut - oldConsumer.messagesOut)) /
+                                                        samplePeriod;
+                                        consumer.msgRate = msgRate.toFixed(0) + "msg/s";
+
+                                        var bytesRate = (1000 * (consumer.bytesOut - oldConsumer.bytesOut)) /
+                                                        samplePeriod
+                                        var bytesRateFormat = new formatBytes( bytesRate );
+                                        consumer.bytesRate = bytesRateFormat.value + bytesRateFormat.units + "/s";
+                                    }
+
+
+                                }
+
+                            }
+                        }
+
                     }
 
                     thisObj.sampleTime = sampleTime;
@@ -249,8 +376,12 @@ require(["dojo/store/JsonRest",
                     thisObj.bytesIn = bytesIn;
                     thisObj.messageOut = messageOut;
                     thisObj.bytesOut = bytesOut;
+                    thisObj.consumers = consumers;
 
+                    // update bindings
                     thisObj.bindingsGrid.update(thisObj.queueData.bindings)
+
+                    // update consumers
                     thisObj.consumersGrid.update(thisObj.queueData.consumers)
 
 

Modified: qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/queue.html
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/queue.html?rev=1304877&r1=1304876&r2=1304877&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/queue.html (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker-plugins/management/src/main/java/resources/queue.html Sat Mar 24 18:14:19 2012
@@ -43,14 +43,14 @@
             <span id="queueDepthMessages" style="position:absolute; right:9.5em"></span>
             <span style="position:absolute; right: 5em; width: 4em"> msgs</span>
             <span id="queueDepthBytes" style="position:absolute; right: 3.3em">(</span>
-            <span id="queueDepthBytesUnits"style="position:absolute; right: 0em; width: 3em">)</span>
+            <span id="queueDepthBytesUnits" style="position:absolute; right: 0em; width: 3em">)</span>
             <br/>
             <span style="">State:</span><span id="state" style="position:absolute; left:6em"></span>
             <span style="position:absolute; left:26em">Pre-fetched:</span>
-            <span id="unacknowledgedMessages" style="position:absolute; right:9.5em">500</span>
+            <span id="unacknowledgedMessages" style="position:absolute; right:9.5em"></span>
             <span style="position:absolute; right: 5em; width: 4em"> msgs</span>
-            <span id="unacknowledgedBytes" style="position:absolute; right: 3.3em">(256</span>
-            <span id="unacknowledgedBytesUnits" style="position:absolute; right: 0em; width: 3em">Kb)</span>
+            <span id="unacknowledgedBytes" style="position:absolute; right: 3.3em"></span>
+            <span id="unacknowledgedBytesUnits" style="position:absolute; right: 0em; width: 3em"></span>
             <br/>
             <span style="">Durable:</span><span id="durable" style="position:absolute; left:6em"></span>
             <span style="position:absolute; left:26em">Inbound:</span>
@@ -76,7 +76,33 @@
             </div>
             <br/>
             <div data-dojo-type="dijit.TitlePane" data-dojo-props="title: 'Alerting Thresholds'">
-                TODO
+                <span style="">Max. Queue Size:</span>
+                <span id="alertThresholdQueueDepthMessages"
+                      style="position:absolute; left:8em; width:8em; text-align:right"></span>
+                <span style="position:absolute; left:16.2em">msgs</span>
+
+                <span id="alertThresholdQueueDepthBytes"
+                      style="position:absolute; left:20em; width:8em; text-align:right"></span>
+                <span id="alertThresholdQueueDepthBytesUnits" style="position:absolute; left:28.2em"></span>
+                <br>
+                <span style="">Max. Message Age:</span>
+                <span id="alertThresholdMessageAge"
+                      style="position:absolute; left:8em; width:8em; text-align:right"></span>
+                <span id="alertThresholdMessageAgeUnits" style="position:absolute; left:16.2em"></span>
+
+                <span style="position:absolute; left:21em">Size: </span>
+                <span id="alertThresholdMessageSize"
+                      style="position:absolute; left:23em; width:5em; text-align:right"></span>
+                <span id="alertThresholdMessageSizeUnits" style="position:absolute; left:28.2em"></span>
+                <br/>
+                <br/>
+                <span style="">Alert frequency:</span>
+                <span id="alertRepeatGap"
+                      style="position:absolute; left:8em; width:8em; text-align:right"></span>
+                <span id="alertRepeatGapUnits" style="position:absolute; left:16.2em"></span>
+
+
+
             </div>
         </div>
         <!--<div data-dojo-type="dijit.layout.ContentPane" data-dojo-props="region:'trailing'">Trailing pane</div>-->

Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java?rev=1304877&r1=1304876&r2=1304877&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java Sat Mar 24 18:14:19 2012
@@ -33,6 +33,7 @@ public class ConsumerAdapter extends Abs
 {
     private final Subscription _subscription;
     private final QueueAdapter _queue;
+    private final ConsumerStatistics _statistics;
 
     public ConsumerAdapter(final QueueAdapter queueAdapter, final Subscription subscription)
     {
@@ -44,6 +45,7 @@ public class ConsumerAdapter extends Abs
 
         _subscription = subscription;
         _queue = queueAdapter;
+        _statistics = new ConsumerStatistics();
         //TODO
     }
 
@@ -169,6 +171,40 @@ public class ConsumerAdapter extends Abs
 
     public Statistics getStatistics()
     {
-        return NoStatistics.getInstance();
+        return _statistics;
+    }
+
+    private class ConsumerStatistics implements Statistics
+    {
+
+        public Collection<String> getStatisticNames()
+        {
+            return AVAILABLE_STATISTICS;
+        }
+
+        public Object getStatistic(String name)
+        {
+            if(name.equals(BYTES_OUT))
+            {
+                return _subscription.getBytesOut();
+            }
+            else if(name.equals(MESSAGES_OUT))
+            {
+                return _subscription.getMessagesOut();
+            }
+            else if(name.equals(STATE_CHANGED))
+            {
+
+            }
+            else if(name.equals(UNACKNOWLEDGED_BYTES))
+            {
+                return _subscription.getUnacknowledgedBytes();
+            }
+            else if(name.equals(UNACKNOWLEDGED_MESSAGES))
+            {
+                return _subscription.getUnacknowledgedMessages();
+            }
+            return null;  // TODO - Implement
+        }
     }
 }

Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java?rev=1304877&r1=1304876&r2=1304877&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java Sat Mar 24 18:14:19 2012
@@ -72,6 +72,10 @@ class ImplicitAcceptDispositionChangeLis
     public boolean acquire()
     {
         boolean acquired = _entry.acquire(getSubscription());
+        if(acquired)
+        {
+            getSubscription().recordUnacknowledged(_entry);
+        }
         return acquired;
 
     }

Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=1304877&r1=1304876&r2=1304877&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java Sat Mar 24 18:14:19 2012
@@ -33,6 +33,14 @@ public interface Subscription
 
     boolean isTransient();
 
+    long getBytesOut();
+
+    long getMessagesOut();
+
+    long getUnacknowledgedBytes();
+
+    long getUnacknowledgedMessages();
+
     public static enum State
     {
         ACTIVE,

Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=1304877&r1=1304876&r2=1304877&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Sat Mar 24 18:14:19 2012
@@ -93,8 +93,13 @@ public abstract class SubscriptionImpl i
     private LogActor _logActor;
     private UUID _id;
     private final AtomicLong _deliveredCount = new AtomicLong(0);
-    private long _createTime = System.currentTimeMillis();
+    private final AtomicLong _deliveredBytes = new AtomicLong(0);
+
+    private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
+    private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
 
+    private long _createTime = System.currentTimeMillis();
+    
 
     static final class BrowserSubscription extends SubscriptionImpl
     {
@@ -277,22 +282,13 @@ public abstract class SubscriptionImpl i
         public void send(QueueEntry entry, boolean batch) throws AMQException
         {
 
-            // if we do not need to wait for client acknowledgements
-            // we can decrement the reference count immediately.
-
-            // By doing this _before_ the send we ensure that it
-            // doesn't get sent if it can't be dequeued, preventing
-            // duplicate delivery on recovery.
-
-            // The send may of course still fail, in which case, as
-            // the message is unacked, it will be lost.
-
+            
             synchronized (getChannel())
             {
                 getChannel().getProtocolSession().setDeferFlush(batch);
                 long deliveryTag = getChannel().getNextDeliveryTag();
 
-
+                addUnacknowledgedMessage(entry);
                 recordMessageDelivery(entry, deliveryTag);
                 sendToClient(entry, deliveryTag);
 
@@ -693,6 +689,7 @@ public abstract class SubscriptionImpl i
     {
         _deliveryMethod.deliverToClient(this,entry,deliveryTag);
         _deliveredCount.incrementAndGet();
+        _deliveredBytes.addAndGet(entry.getSize());
     }
 
 
@@ -828,4 +825,44 @@ public abstract class SubscriptionImpl i
 
         _channel.getProtocolSession().flushBatched();
     }
+
+    public long getBytesOut()
+    {
+        return _deliveredBytes.longValue();
+    }
+
+    public long getMessagesOut()
+    {
+        return _deliveredCount.longValue();
+    }
+
+
+    protected void addUnacknowledgedMessage(QueueEntry entry)
+    {
+        final long size = entry.getSize();
+        _unacknowledgedBytes.addAndGet(size);
+        _unacknowledgedCount.incrementAndGet();
+        entry.addStateChangeListener(new QueueEntry.StateChangeListener()
+        {
+            public void stateChanged(QueueEntry entry, QueueEntry.State oldState, QueueEntry.State newState)
+            {
+                if(oldState.equals(QueueEntry.State.ACQUIRED) && !newState.equals(QueueEntry.State.ACQUIRED))
+                {
+                    _unacknowledgedBytes.addAndGet(-size);
+                    _unacknowledgedCount.decrementAndGet();
+                    entry.removeStateChangeListener(this);
+                }
+            }
+        });
+    }
+    
+    public long getUnacknowledgedBytes()
+    {
+        return _unacknowledgedBytes.longValue();
+    }
+
+    public long getUnacknowledgedMessages()
+    {
+        return _unacknowledgedCount.longValue();
+    }
 }

Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1304877&r1=1304876&r2=1304877&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Sat Mar 24 18:14:19 2012
@@ -130,6 +130,10 @@ public class Subscription_0_10 implement
     private String _trace;
     private final long _createTime = System.currentTimeMillis();
     private final AtomicLong _deliveredCount = new AtomicLong(0);
+    private final AtomicLong _deliveredBytes = new AtomicLong(0);
+    private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
+    private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
+
     private final Map<String, Object> _arguments;
     private int _deferredMessageCredit;
     private long _deferredSizeCredit;
@@ -631,10 +635,15 @@ public class Subscription_0_10 implement
             _session.sendMessage(xfr, _postIdSettingAction);
             entry.incrementDeliveryCount();
             _deliveredCount.incrementAndGet();
+            _deliveredBytes.addAndGet(entry.getSize());
             if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED)
             {
                 forceDequeue(entry, false);
             }
+            else if(_acquireMode == MessageAcquireMode.PRE_ACQUIRED)
+            {
+                recordUnacknowledged(entry);
+            }
         }
         else
         {
@@ -643,6 +652,12 @@ public class Subscription_0_10 implement
         }
     }
 
+    void recordUnacknowledged(QueueEntry entry)
+    {
+        _unacknowledgedCount.incrementAndGet();
+        _unacknowledgedBytes.addAndGet(entry.getSize());
+    }
+
     private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
     {
         _deferredMessageCredit += deferredMessageCredit;
@@ -934,6 +949,8 @@ public class Subscription_0_10 implement
         // TODO Fix Store Context / cleanup
         if(entry.isAcquiredBy(this))
         {
+            _unacknowledgedBytes.addAndGet(-entry.getSize());
+            _unacknowledgedCount.decrementAndGet();
             entry.discard();
         }
     }
@@ -1085,4 +1102,24 @@ public class Subscription_0_10 implement
     {
         _session.getConnection().flush();
     }
+
+    public long getBytesOut()
+    {
+        return _deliveredBytes.longValue();
+    }
+
+    public long getMessagesOut()
+    {
+        return _deliveredCount.longValue();
+    }
+
+    public long getUnacknowledgedBytes()
+    {
+        return _unacknowledgedBytes.longValue();
+    }
+
+    public long getUnacknowledgedMessages()
+    {
+        return _unacknowledgedCount.longValue();
+    }
 }

Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=1304877&r1=1304876&r2=1304877&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Sat Mar 24 18:14:19 2012
@@ -122,6 +122,26 @@ public class MockSubscription implements
         return false;
     }
 
+    public long getBytesOut()
+    {
+        return 0;  // TODO - Implement
+    }
+
+    public long getMessagesOut()
+    {
+        return 0;  // TODO - Implement
+    }
+
+    public long getUnacknowledgedBytes()
+    {
+        return 0;  // TODO - Implement
+    }
+
+    public long getUnacknowledgedMessages()
+    {
+        return 0;  // TODO - Implement
+    }
+
     public AMQQueue getQueue()
     {
         return queue;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org