You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/15 10:51:41 UTC

svn commit: r1769773 - in /qpid/java/branches/remove-queue-runner: ./ broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/protocol/ broker-c...

Author: rgodfrey
Date: Tue Nov 15 10:51:40 2016
New Revision: 1769773

URL: http://svn.apache.org/viewvc?rev=1769773&view=rev
Log:
Merged from trunk up to r1769383

Added:
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java
      - copied unchanged from r1769383, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
      - copied unchanged from r1769383, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
Removed:
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListBase.java
Modified:
    qpid/java/branches/remove-queue-runner/   (props changed)
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java
    qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
    qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
    qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
    qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java

Propchange: qpid/java/branches/remove-queue-runner/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Nov 15 10:51:40 2016
@@ -9,5 +9,5 @@
 /qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
 /qpid/branches/java-network-refactor/qpid/java:805429-821809
 /qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk:1767741-1769382
+/qpid/java/trunk:1767741-1769383
 /qpid/trunk/qpid:796646-796653

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Tue Nov 15 10:51:40 2016
@@ -57,7 +57,7 @@ public interface ConsumerTarget
 
     long getUnacknowledgedMessages();
 
-    AMQSessionModel getSessionModel();
+    AMQSessionModel<?> getSessionModel();
 
     long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
 

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Tue Nov 15 10:51:40 2016
@@ -367,12 +367,6 @@ public interface Queue<X extends Queue<X
 
     void requeue(QueueEntry entry);
 
-    void dequeue(QueueEntry entry);
-
-    void decrementUnackedMsgCount(QueueEntry queueEntry);
-
-    void incrementUnackedMsgCount(QueueEntry entry);
-
     List<? extends QueueEntry> getMessagesOnTheQueue();
 
     List<Long> getMessagesOnTheQueue(int num);
@@ -410,4 +404,6 @@ public interface Queue<X extends Queue<X
     long getPotentialMemoryFootprint();
 
     boolean isHeld(QueueEntry queueEntry, final long evaluationTime);
+
+    void checkCapacity();
 }

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java Tue Nov 15 10:51:40 2016
@@ -22,5 +22,5 @@ package org.apache.qpid.server.protocol;
 
 public interface CapacityChecker
 {
-    void checkCapacity(AMQSessionModel channel);
+    void checkCapacity(AMQSessionModel<?> channel);
 }

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Nov 15 10:51:40 2016
@@ -154,26 +154,11 @@ public abstract class AbstractQueue<X ex
 
     private volatile QueueConsumer<?> _exclusiveSubscriber;
 
-    private final AtomicInteger _atomicQueueCount = new AtomicInteger(0);
-
-    private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
-
     private final AtomicLong _targetQueueSize = new AtomicLong(INITIAL_TARGET_QUEUE_SIZE);
 
     private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
 
-    private final AtomicLong _dequeueCount = new AtomicLong();
-    private final AtomicLong _dequeueSize = new AtomicLong();
-    private final AtomicLong _enqueueCount = new AtomicLong();
-    private final AtomicLong _enqueueSize = new AtomicLong();
-    private final AtomicLong _persistentMessageEnqueueSize = new AtomicLong();
-    private final AtomicLong _persistentMessageDequeueSize = new AtomicLong();
-    private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong();
-    private final AtomicLong _persistentMessageDequeueCount = new AtomicLong();
-    private final AtomicLong _unackedMsgCount = new AtomicLong(0);
-    private final AtomicLong _unackedMsgBytes = new AtomicLong();
-
-    private final AtomicInteger _bindingCountHigh = new AtomicInteger();
+    private final QueueStatistics _queueStatistics = new QueueStatistics();
 
     /** max allowed size(KB) of a single message */
     @ManagedAttributeField( afterSet = "updateAlertChecks" )
@@ -390,7 +375,7 @@ public abstract class AbstractQueue<X ex
 
         Map<String,Object> attributes = getActualAttributes();
 
-        final LinkedHashMap<String, Object> arguments = new LinkedHashMap<String, Object>(attributes);
+        final LinkedHashMap<String, Object> arguments = new LinkedHashMap<>(attributes);
 
         arguments.put(Queue.EXCLUSIVE, _exclusive);
         arguments.put(Queue.LIFETIME_POLICY, getLifetimePolicy());
@@ -1077,15 +1062,6 @@ public abstract class AbstractQueue<X ex
     public void addBinding(final Binding<?> binding)
     {
         _bindings.add(binding);
-        int bindingCount = _bindings.size();
-        int bindingCountHigh;
-        while(bindingCount > (bindingCountHigh = _bindingCountHigh.get()))
-        {
-            if(_bindingCountHigh.compareAndSet(bindingCountHigh, bindingCount))
-            {
-                break;
-            }
-        }
         childAdded(binding);
     }
 
@@ -1114,8 +1090,6 @@ public abstract class AbstractQueue<X ex
 
     public final void enqueue(ServerMessage message, Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
     {
-        incrementQueueCount();
-        incrementQueueSize(message);
 
         if(_recovering.get() != RECOVERED)
         {
@@ -1148,15 +1122,13 @@ public abstract class AbstractQueue<X ex
             doEnqueue(message, action, enqueueRecord);
         }
 
-        long estimatedQueueSize = _atomicQueueSize.get() + _atomicQueueCount.get() * _estimatedAverageMessageHeaderSize;
+        long estimatedQueueSize = _queueStatistics.getQueueSize() + _queueStatistics.getQueueCount() * _estimatedAverageMessageHeaderSize;
         _flowToDiskChecker.flowToDiskAndReportIfNecessary(message.getStoredMessage(), estimatedQueueSize,
                                                           _targetQueueSize.get());
     }
 
     public final void recover(ServerMessage message, final MessageEnqueueRecord enqueueRecord)
     {
-        incrementQueueCount();
-        incrementQueueSize(message);
         doEnqueue(message, null, enqueueRecord);
     }
 
@@ -1229,7 +1201,7 @@ public abstract class AbstractQueue<X ex
             {
                 arrivalTime = System.currentTimeMillis();
             }
-            if(expiration != 0l)
+            if(expiration != 0L)
             {
                 long calculatedExpiration = arrivalTime+_minimumMessageTtl;
                 if(calculatedExpiration > expiration)
@@ -1239,14 +1211,14 @@ public abstract class AbstractQueue<X ex
                 }
             }
         }
-        if(_maximumMessageTtl != 0l)
+        if(_maximumMessageTtl != 0L)
         {
             if(arrivalTime == 0)
             {
                 arrivalTime = System.currentTimeMillis();
             }
             long calculatedExpiration = arrivalTime+_maximumMessageTtl;
-            if(expiration == 0l || expiration > calculatedExpiration)
+            if(expiration == 0L || expiration > calculatedExpiration)
             {
                 entry.setExpiration(calculatedExpiration);
             }
@@ -1278,19 +1250,6 @@ public abstract class AbstractQueue<X ex
         // Simple Queues don't :-)
     }
 
-    private void incrementQueueSize(final ServerMessage message)
-    {
-        long size = message.getSize();
-        getAtomicQueueSize().addAndGet(size);
-        _enqueueCount.incrementAndGet();
-        _enqueueSize.addAndGet(size);
-        if(message.isPersistent() && isDurable())
-        {
-            _persistentMessageEnqueueSize.addAndGet(size);
-            _persistentMessageEnqueueCount.incrementAndGet();
-        }
-    }
-
     @Override
     public void setTargetSize(final long targetSize)
     {
@@ -1302,17 +1261,12 @@ public abstract class AbstractQueue<X ex
 
     public long getTotalDequeuedMessages()
     {
-        return _dequeueCount.get();
+        return _queueStatistics.getDequeueCount();
     }
 
     public long getTotalEnqueuedMessages()
     {
-        return _enqueueCount.get();
-    }
-
-    private void incrementQueueCount()
-    {
-        getAtomicQueueCount().incrementAndGet();
+        return _queueStatistics.getEnqueueCount();
     }
 
     private void setLastSeenEntry(final QueueConsumer<?> sub, final QueueEntry entry)
@@ -1377,35 +1331,6 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-
-    @Override
-    public void dequeue(QueueEntry entry)
-    {
-        decrementQueueCount();
-        decrementQueueSize(entry);
-        checkCapacity();
-    }
-
-    private void decrementQueueSize(final QueueEntry entry)
-    {
-        final ServerMessage message = entry.getMessage();
-        long size = message.getSize();
-        getAtomicQueueSize().addAndGet(-size);
-        _dequeueSize.addAndGet(size);
-        if(message.isPersistent() && isDurable())
-        {
-            _persistentMessageDequeueSize.addAndGet(size);
-            _persistentMessageDequeueCount.incrementAndGet();
-        }
-    }
-
-    void decrementQueueCount()
-    {
-        getAtomicQueueCount().decrementAndGet();
-        _dequeueCount.incrementAndGet();
-    }
-
-
     public int getConsumerCount()
     {
         return _queueConsumerManager.getAllSize();
@@ -1429,20 +1354,20 @@ public abstract class AbstractQueue<X ex
     @Override
     public int getQueueDepthMessages()
     {
-        return getAtomicQueueCount().get();
+        return _queueStatistics.getQueueCount();
     }
 
     public long getQueueDepthBytes()
     {
-        return getAtomicQueueSize().get();
+        return _queueStatistics.getQueueSize();
     }
 
     @Override
     public long getOldestMessageArrivalTime()
     {
-        long oldestMessageArrivalTime = -1l;
+        long oldestMessageArrivalTime = -1L;
 
-        while(oldestMessageArrivalTime == -1l)
+        while(oldestMessageArrivalTime == -1L)
         {
             QueueEntry entry = getEntries().getOldestEntry();
             if (entry != null)
@@ -1512,16 +1437,6 @@ public abstract class AbstractQueue<X ex
         return getName().compareTo(o.getName());
     }
 
-    public AtomicInteger getAtomicQueueCount()
-    {
-        return _atomicQueueCount;
-    }
-
-    public AtomicLong getAtomicQueueSize()
-    {
-        return _atomicQueueSize;
-    }
-
     private boolean hasExclusiveConsumer()
     {
         return _exclusiveSubscriber != null;
@@ -1535,6 +1450,11 @@ public abstract class AbstractQueue<X ex
     /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
     abstract QueueEntryList getEntries();
 
+    final QueueStatistics getQueueStatistics()
+    {
+        return _queueStatistics;
+    }
+
     protected final QueueConsumerManagerImpl getQueueConsumerManager()
     {
         return _queueConsumerManager;
@@ -1545,33 +1465,14 @@ public abstract class AbstractQueue<X ex
         return _virtualHost.getEventLogger();
     }
 
-    public static interface QueueEntryFilter
+    public interface QueueEntryFilter
     {
-        public boolean accept(QueueEntry entry);
+        boolean accept(QueueEntry entry);
 
-        public boolean filterComplete();
+        boolean filterComplete();
     }
 
 
-
-    public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
-    {
-        return getMessagesOnTheQueue(new QueueEntryFilter()
-        {
-
-            public boolean accept(QueueEntry entry)
-            {
-                final long messageId = entry.getMessage().getMessageNumber();
-                return messageId >= fromMessageId && messageId <= toMessageId;
-            }
-
-            public boolean filterComplete()
-            {
-                return false;
-            }
-        });
-    }
-
     public QueueEntry getMessageOnTheQueue(final long messageId)
     {
         List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
@@ -1862,27 +1763,28 @@ public abstract class AbstractQueue<X ex
         return Futures.immediateFuture(null);
     }
 
-    public void checkCapacity(AMQSessionModel channel)
+    @Override
+    public void checkCapacity(AMQSessionModel<?> channel)
     {
-        if(_queueFlowControlSizeBytes != 0l)
+        if(_queueFlowControlSizeBytes != 0L)
         {
-            if(_atomicQueueSize.get() > _queueFlowControlSizeBytes)
+            if(_queueStatistics.getQueueSize() > _queueFlowControlSizeBytes)
             {
                 _overfull.set(true);
                 //Overfull log message
-                getEventLogger().message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(),
+                getEventLogger().message(_logSubject, QueueMessages.OVERFULL(_queueStatistics.getQueueSize(),
                                                                              _queueFlowControlSizeBytes));
 
                 _blockedChannels.add(channel);
 
                 channel.block(this);
 
-                if(_atomicQueueSize.get() <= _queueFlowResumeSizeBytes)
+                if(_queueStatistics.getQueueSize() <= _queueFlowResumeSizeBytes)
                 {
 
                     //Underfull log message
                     getEventLogger().message(_logSubject,
-                                             QueueMessages.UNDERFULL(_atomicQueueSize.get(), _queueFlowResumeSizeBytes));
+                                             QueueMessages.UNDERFULL(_queueStatistics.getQueueSize(), _queueFlowResumeSizeBytes));
 
                    channel.unblock(this);
                    _blockedChannels.remove(channel);
@@ -1895,22 +1797,27 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    private void checkCapacity()
+    public void checkCapacity()
     {
-        if(_queueFlowControlSizeBytes != 0L)
+        if(getEntries() != null)
         {
-            if(_overfull.get() && _atomicQueueSize.get() <= _queueFlowResumeSizeBytes)
+            if (_queueFlowControlSizeBytes != 0L)
             {
-                if(_overfull.compareAndSet(true,false))
-                {//Underfull log message
-                    getEventLogger().message(_logSubject,
-                                             QueueMessages.UNDERFULL(_atomicQueueSize.get(), _queueFlowResumeSizeBytes));
-                }
-
-                for(final AMQSessionModel blockedChannel : _blockedChannels)
+                if (_overfull.get() && _queueStatistics.getQueueSize() <= _queueFlowResumeSizeBytes)
                 {
-                    blockedChannel.unblock(this);
-                    _blockedChannels.remove(blockedChannel);
+                    if (_overfull.compareAndSet(true, false))
+                    {
+                        //Underfull log message
+                        getEventLogger().message(_logSubject,
+                                                 QueueMessages.UNDERFULL(_queueStatistics.getQueueSize(),
+                                                                         _queueFlowResumeSizeBytes));
+                    }
+
+                    for (final AMQSessionModel<?> blockedChannel : _blockedChannels)
+                    {
+                        blockedChannel.unblock(this);
+                        _blockedChannels.remove(blockedChannel);
+                    }
                 }
             }
         }
@@ -2174,7 +2081,7 @@ public abstract class AbstractQueue<X ex
     {
         QueueEntryIterator queueListIterator = getEntries().iterator();
 
-        final long estimatedQueueSize = _atomicQueueSize.get() + _atomicQueueCount.get() * _estimatedAverageMessageHeaderSize;
+        final long estimatedQueueSize = _queueStatistics.getQueueSize() + _queueStatistics.getQueueCount() * _estimatedAverageMessageHeaderSize;
         _flowToDiskChecker.reportFlowToDiskStatusIfNecessary(estimatedQueueSize, _targetQueueSize.get());
 
         final Set<NotificationCheck> perMessageChecks = new HashSet<>();
@@ -2490,8 +2397,7 @@ public abstract class AbstractQueue<X ex
 
             try
             {
-                 long foo = ByteStreams.copy(inputStream, outputStream);
-                foo = foo +1 -1;
+                ByteStreams.copy(inputStream, outputStream);
             }
             finally
             {
@@ -2538,32 +2444,32 @@ public abstract class AbstractQueue<X ex
 
     public long getTotalEnqueuedBytes()
     {
-        return _enqueueSize.get();
+        return _queueStatistics.getEnqueueSize();
     }
 
     public long getTotalDequeuedBytes()
     {
-        return _dequeueSize.get();
+        return _queueStatistics.getDequeueSize();
     }
 
     public long getPersistentEnqueuedBytes()
     {
-        return _persistentMessageEnqueueSize.get();
+        return _queueStatistics.getPersistentEnqueueSize();
     }
 
     public long getPersistentDequeuedBytes()
     {
-        return _persistentMessageDequeueSize.get();
+        return _queueStatistics.getPersistentDequeueSize();
     }
 
     public long getPersistentEnqueuedMessages()
     {
-        return _persistentMessageEnqueueCount.get();
+        return _queueStatistics.getPersistentEnqueueCount();
     }
 
     public long getPersistentDequeuedMessages()
     {
-        return _persistentMessageDequeueCount.get();
+        return _queueStatistics.getPersistentDequeueCount();
     }
 
     @Override
@@ -2611,26 +2517,12 @@ public abstract class AbstractQueue<X ex
 
     public long getUnacknowledgedMessages()
     {
-        return _unackedMsgCount.get();
+        return _queueStatistics.getUnackedCount();
     }
 
     public long getUnacknowledgedBytes()
     {
-        return _unackedMsgBytes.get();
-    }
-
-    @Override
-    public void decrementUnackedMsgCount(QueueEntry queueEntry)
-    {
-        _unackedMsgCount.decrementAndGet();
-        _unackedMsgBytes.addAndGet(-queueEntry.getSize());
-    }
-
-    @Override
-    public void incrementUnackedMsgCount(QueueEntry entry)
-    {
-        _unackedMsgCount.incrementAndGet();
-        _unackedMsgBytes.addAndGet(entry.getSize());
+        return _queueStatistics.getUnackedSize();
     }
 
     @Override

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java Tue Nov 15 10:51:40 2016
@@ -44,7 +44,7 @@ public class LastValueQueueImpl extends
     protected void onOpen()
     {
         super.onOpen();
-        _entries = new LastValueQueueList(this);
+        _entries = new LastValueQueueList(this, getQueueStatistics());
     }
 
     @Override

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java Tue Nov 15 10:51:40 2016
@@ -57,9 +57,9 @@ public class LastValueQueueList extends
     private final ConflationQueueEntry _deleteInProgress = new ConflationQueueEntry(this);
     private final ConflationQueueEntry _newerEntryAlreadyBeenAndGone = new ConflationQueueEntry(this);
 
-    public LastValueQueueList(LastValueQueueImpl queue)
+    public LastValueQueueList(LastValueQueue<?> queue, QueueStatistics queueStatistics)
     {
-        super(queue, HEAD_CREATOR);
+        super(queue, queueStatistics, HEAD_CREATOR);
         _conflationKey = queue.getLvqKey();
     }
 

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java Tue Nov 15 10:51:40 2016
@@ -30,7 +30,7 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 
-public abstract class OrderedQueueEntryList implements QueueEntryList
+public abstract class OrderedQueueEntryList extends AbstractQueueEntryList
 {
 
     private final OrderedQueueEntry _head;
@@ -53,8 +53,11 @@ public abstract class OrderedQueueEntryL
     private final AtomicReference<QueueEntry> _unscavengedHWM = new AtomicReference<QueueEntry>();
 
 
-    public OrderedQueueEntryList(Queue<?> queue, HeadCreator headCreator)
+    public OrderedQueueEntryList(Queue<?> queue,
+                                 final QueueStatistics queueStatistics,
+                                 HeadCreator headCreator)
     {
+        super(queue, queueStatistics);
         _queue = queue;
         _scavengeCount = _queue.getContextValue(Integer.class, QUEUE_SCAVANGE_COUNT);
         _head = headCreator.createHead(this);
@@ -84,7 +87,8 @@ public abstract class OrderedQueueEntryL
 
     public QueueEntry add(ServerMessage message, final MessageEnqueueRecord enqueueRecord)
     {
-        OrderedQueueEntry node = createQueueEntry(message, enqueueRecord);
+        final OrderedQueueEntry node = createQueueEntry(message, enqueueRecord);
+        updateStatsOnEnqueue(node);
         for (;;)
         {
             OrderedQueueEntry tail = _tail;

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Tue Nov 15 10:51:40 2016
@@ -35,7 +35,7 @@ abstract public class PriorityQueueList
     public PriorityQueueList(final PriorityQueueImpl queue,
                              final HeadCreator headCreator)
     {
-        super(queue, headCreator);
+        super(queue, queue.getQueueStatistics(), headCreator);
     }
 
     static class PriorityQueueMasterList extends PriorityQueueList

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Tue Nov 15 10:51:40 2016
@@ -481,7 +481,6 @@ class QueueConsumerImpl
     public void acquisitionRemoved(final QueueEntry node)
     {
         _target.acquisitionRemoved(node);
-        _queue.decrementUnackedMsgCount(node);
     }
 
     @Override

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue Nov 15 10:51:40 2016
@@ -296,7 +296,7 @@ public abstract class QueueEntryImpl imp
             }
         }
 
-        if(acquired && _stateChangeListeners != null)
+        if(acquired)
         {
             notifyStateChange(AVAILABLE_STATE, state);
         }
@@ -310,7 +310,6 @@ public abstract class QueueEntryImpl imp
         if(acquired)
         {
             _deliveryCountUpdater.compareAndSet(this,-1,0);
-            getQueue().incrementUnackedMsgCount(this);
         }
         return acquired;
     }
@@ -404,7 +403,7 @@ public abstract class QueueEntryImpl imp
     {
         EntryState state = _state;
 
-        if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
+        if((state.getState() == State.ACQUIRED) && _stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
         {
             postRelease(state);
         }
@@ -422,15 +421,11 @@ public abstract class QueueEntryImpl imp
 
     private void postRelease(final EntryState previousState)
     {
-        if (previousState instanceof ConsumerAcquiredState)
-        {
-            getQueue().decrementUnackedMsgCount(this);
-        }
 
         if(!getQueue().isDeleted())
         {
             getQueue().requeue(this);
-            if (_stateChangeListeners != null && previousState.getState() == State.ACQUIRED)
+            if (previousState.getState() == State.ACQUIRED)
             {
                 notifyStateChange(previousState, AVAILABLE_STATE);
             }
@@ -523,16 +518,7 @@ public abstract class QueueEntryImpl imp
 
         if(state.getState() == State.ACQUIRED)
         {
-            if (state instanceof ConsumerAcquiredState)
-            {
-                getQueue().decrementUnackedMsgCount(this);
-            }
-
-            getQueue().dequeue(this);
-            if(_stateChangeListeners != null)
-            {
-                notifyStateChange(state, DEQUEUED_STATE);
-            }
+            notifyStateChange(state, DEQUEUED_STATE);
             return true;
         }
         else
@@ -544,6 +530,7 @@ public abstract class QueueEntryImpl imp
 
     private void notifyStateChange(final EntryState oldState, final EntryState newState)
     {
+        _queueEntryList.updateStatsOnStateChange(this, oldState, newState);
         StateChangeListenerEntry<? super QueueEntry, EntryState> entry = _listenersUpdater.get(this);
         while(entry != null)
         {
@@ -562,6 +549,7 @@ public abstract class QueueEntryImpl imp
 
         if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
         {
+            notifyStateChange(state, DELETED_STATE);
             _queueEntryList.entryDeleted(this);
             onDelete();
             _message.release();

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Tue Nov 15 10:51:40 2016
@@ -24,7 +24,7 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 
-public interface QueueEntryList
+interface QueueEntryList
 {
     Queue<?> getQueue();
 
@@ -44,4 +44,6 @@ public interface QueueEntryList
     
     int getPriorities();
 
+    void updateStatsOnStateChange(QueueEntry entry, QueueEntry.EntryState fromState, QueueEntry.EntryState toState);
+
 }

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java Tue Nov 15 10:51:40 2016
@@ -31,7 +31,7 @@ import org.apache.qpid.server.store.Mess
  * ISBN-13: 978-0262033848
  * see http://en.wikipedia.org/wiki/Red-black_tree
  */
-public class SortedQueueEntryList implements QueueEntryList
+public class SortedQueueEntryList extends AbstractQueueEntryList
 {
     private final SortedQueueEntry _head;
     private SortedQueueEntry _root;
@@ -40,8 +40,9 @@ public class SortedQueueEntryList implem
     private final SortedQueueImpl _queue;
     private final String _propertyName;
 
-    public SortedQueueEntryList(final SortedQueueImpl queue)
+    public SortedQueueEntryList(final SortedQueueImpl queue, final QueueStatistics queueStatistics)
     {
+        super(queue, queueStatistics);
         _queue = queue;
         _head = new SortedQueueEntry(this);
         _propertyName = queue.getSortKey();
@@ -64,6 +65,8 @@ public class SortedQueueEntryList implem
             }
 
             final SortedQueueEntry entry = new SortedQueueEntry(this,message, ++_entryId, enqueueRecord);
+            updateStatsOnEnqueue(entry);
+
             entry.setKey(key);
 
             insert(entry);

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java Tue Nov 15 10:51:40 2016
@@ -50,7 +50,7 @@ public class SortedQueueImpl extends Out
     protected void onOpen()
     {
         super.onOpen();
-        _entries = new SortedQueueEntryList(this);
+        _entries = new SortedQueueEntryList(this, getQueueStatistics());
     }
 
     @Override

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java Tue Nov 15 10:51:40 2016
@@ -35,9 +35,9 @@ public class StandardQueueEntryList exte
         }
     };
 
-    public StandardQueueEntryList(final StandardQueueImpl queue)
+    public StandardQueueEntryList(final StandardQueue<?> queue, QueueStatistics queueStatistics)
     {
-        super(queue, HEAD_CREATOR);
+        super(queue, queueStatistics, HEAD_CREATOR);
     }
 
 

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java Tue Nov 15 10:51:40 2016
@@ -39,7 +39,7 @@ public class StandardQueueImpl extends A
     protected void onOpen()
     {
         super.onOpen();
-        _entries = new StandardQueueEntryList(this);
+        _entries = new StandardQueueEntryList(this, getQueueStatistics());
     }
 
     @Override

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java Tue Nov 15 10:51:40 2016
@@ -193,6 +193,7 @@ public class QueueMessageRecoveryTest ex
     {
 
         private final List<ServerMessage<?>> _messageList;
+        private final QueueEntryList _entries = mock(QueueEntryList.class);
 
         protected TestQueue(final Map<String, Object> attributes,
                             final QueueManagingVirtualHost<?> virtualHost,
@@ -205,7 +206,7 @@ public class QueueMessageRecoveryTest ex
         @Override
         QueueEntryList getEntries()
         {
-            return null;
+            return _entries;
         }
 
         @Override

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java Tue Nov 15 10:51:40 2016
@@ -33,7 +33,7 @@ public class SelfValidatingSortedQueueEn
 {
     public SelfValidatingSortedQueueEntryList(SortedQueueImpl queue)
     {
-        super(queue);
+        super(queue, queue.getQueueStatistics());
     }
 
     @Override

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java Tue Nov 15 10:51:40 2016
@@ -132,7 +132,12 @@ public class StandardQueueEntryListTest
     public ServerMessage getTestMessageToAdd()
     {
         ServerMessage msg = mock(ServerMessage.class);
+        MessageReference ref = mock(MessageReference.class);
+        when(ref.getMessage()).thenReturn(msg);
         when(msg.getMessageNumber()).thenReturn(1l);
+        when(msg.newReference()).thenReturn(ref);
+        when(msg.newReference(any(TransactionLogResource.class))).thenReturn(ref);
+
         return msg;
     }
 
@@ -146,7 +151,7 @@ public class StandardQueueEntryListTest
     {
         StandardQueueImpl mockQueue = mock(StandardQueueImpl.class);
         when(mockQueue.getContextValue(Integer.class, QUEUE_SCAVANGE_COUNT)).thenReturn(9);
-        OrderedQueueEntryList sqel = new StandardQueueEntryList(mockQueue);
+        OrderedQueueEntryList sqel = new StandardQueueEntryList(mockQueue, new QueueStatistics());
         ConcurrentMap<Integer,QueueEntry> entriesMap = new ConcurrentHashMap<Integer,QueueEntry>();
 
 

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Tue Nov 15 10:51:40 2016
@@ -189,7 +189,7 @@ public class StandardQueueTest extends A
     private static class DequeuedQueue extends AbstractQueue
     {
 
-        private QueueEntryList _entries = new DequeuedQueueEntryList(this);
+        private QueueEntryList _entries = new DequeuedQueueEntryList(this, getQueueStatistics());
 
         public DequeuedQueue(QueueManagingVirtualHost<?> virtualHost)
         {
@@ -225,9 +225,9 @@ public class StandardQueueTest extends A
                     }
                 };
 
-        public DequeuedQueueEntryList(final DequeuedQueue queue)
+        public DequeuedQueueEntryList(final DequeuedQueue queue, final QueueStatistics queueStatistics)
         {
-            super(queue, HEAD_CREATOR);
+            super(queue, queueStatistics, HEAD_CREATOR);
         }
 
         /**

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Tue Nov 15 10:51:40 2016
@@ -342,7 +342,7 @@ public abstract class ConsumerTarget_0_8
         return _targetAddress;
     }
 
-    public AMQSessionModel getSessionModel()
+    public AMQChannel getSessionModel()
     {
         return _channel;
     }

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1769773&r1=1769772&r2=1769773&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Tue Nov 15 10:51:40 2016
@@ -32,7 +32,6 @@ import org.apache.qpid.server.consumer.C
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageConverter;
-import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
 import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
@@ -494,7 +493,7 @@ class ConsumerTarget_1_0 extends Abstrac
     }
 
     @Override
-    public AMQSessionModel getSessionModel()
+    public Session_1_0 getSessionModel()
     {
         return getSession();
     }




---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org