You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/02/28 11:33:05 UTC

svn commit: r1784725 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/src/main/java/org/apache/qpid/server/store/ broker-core/src/test/java/org/apache/qpi...

Author: orudyy
Date: Tue Feb 28 11:33:05 2017
New Revision: 1784725

URL: http://svn.apache.org/viewvc?rev=1784725&view=rev
Log:
QPID-7618: Address review comments from Keith Wall and fix remaining issues

* Fix upgrader functionality to use correct overlow policy name
  for 'producer flow control'
* Use attribute 'queueDepthBytesIncludingHeader' instead of attribute
  'queueDepthBytes' in producer flow control handler
* Fix UI for queue editing to stop displaying error message
  when overflow policy is not set explicitly
* On queue UI display 'unlimited' for negative queue depth limit values
* Rename methods #getLesserOldestEntry into #getLeastSignificantOldestEntry
* Change SortedQueueEntryList to return the oldest entry from
  QueueEntryList#getLeastSignificantOldestEntry
* Remove redundant code in
  ProducerFlowControlOverflowPolicyHandler.Handler#bulkChangeEnd
  mistakenly added in one of previous commits
* Fix failing tests from test suite DeliveryDelayTest

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
    qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/common/util.js
    qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/common/widgetconfigurer.js
    qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
    qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/editQueue.js
    qpid/java/trunk/doc/java-broker/src/docbkx/concepts/Java-Broker-Concepts-Queues.xml
    qpid/java/trunk/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverydelay/DeliveryDelayTest.java
    qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
    qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Tue Feb 28 11:33:05 2017
@@ -489,6 +489,6 @@ public interface Queue<X extends Queue<X
 
     void deleteEntry(QueueEntry entry);
 
-    QueueEntry getLesserOldestEntry();
+    QueueEntry getLeastSignificantOldestEntry();
 
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Feb 28 11:33:05 2017
@@ -3166,9 +3166,9 @@ public abstract class AbstractQueue<X ex
     }
 
     @Override
-    public QueueEntry getLesserOldestEntry()
+    public QueueEntry getLeastSignificantOldestEntry()
     {
-        return getEntries().getLesserOldestEntry();
+        return getEntries().getLeastSignificantOldestEntry();
     }
 
     private class MessageFinder implements QueueEntryVisitor

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java Tue Feb 28 11:33:05 2017
@@ -147,7 +147,7 @@ public class LastValueQueueList extends
     }
 
     @Override
-    public QueueEntry getLesserOldestEntry()
+    public QueueEntry getLeastSignificantOldestEntry()
     {
         return getOldestEntry();
     }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Tue Feb 28 11:33:05 2017
@@ -216,11 +216,11 @@ abstract public class PriorityQueueList
         }
 
         @Override
-        public QueueEntry getLesserOldestEntry()
+        public QueueEntry getLeastSignificantOldestEntry()
         {
             for(PriorityQueueEntrySubList subList : _priorityLists)
             {
-                QueueEntry subListLast = subList.getLesserOldestEntry();
+                QueueEntry subListLast = subList.getLeastSignificantOldestEntry();
                 if(subListLast != null)
                 {
                     return subListLast;
@@ -261,7 +261,7 @@ abstract public class PriorityQueueList
         }
 
         @Override
-        public QueueEntry getLesserOldestEntry()
+        public QueueEntry getLeastSignificantOldestEntry()
         {
             return getOldestEntry();
         }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java Tue Feb 28 11:33:05 2017
@@ -131,21 +131,6 @@ public class ProducerFlowControlOverflow
             {
                 _queue.removeChangeListener(this);
                 checkUnderfull(-1, -1);
-
-                if (_overfullReported.compareAndSet(true, false))
-                {
-                    _eventLogger.message(_queue.getLogSubject(),
-                                         QueueMessages.UNDERFULL(_queue.getQueueDepthBytes(),
-                                                                 getFlowResumeLimit((long) -1),
-                                                                 (long) _queue.getQueueDepthMessages(),
-                                                                 getFlowResumeLimit((long) -1)));
-                }
-
-                for (final AMQPSession<?, ?> blockedSession : _blockedSessions)
-                {
-                    blockedSession.unblock(_queue);
-                    _blockedSessions.remove(blockedSession);
-                }
             }
         }
 
@@ -156,7 +141,7 @@ public class ProducerFlowControlOverflow
 
         private void checkUnderfull(long maximumQueueDepthBytes, long maximumQueueDepthMessages)
         {
-            long queueDepthBytes = _queue.getQueueDepthBytes();
+            long queueDepthBytes = _queue.getQueueDepthBytesIncludingHeader();
             long queueDepthMessages = _queue.getQueueDepthMessages();
 
             if (isUnderfull(queueDepthBytes, maximumQueueDepthBytes)
@@ -181,7 +166,7 @@ public class ProducerFlowControlOverflow
 
         private void checkOverfull(final long maximumQueueDepthBytes, final long maximumQueueDepthMessages)
         {
-            final long queueDepthBytes = _queue.getQueueDepthBytes();
+            final long queueDepthBytes = _queue.getQueueDepthBytesIncludingHeader();
             final long queueDepthMessages = _queue.getQueueDepthMessages();
 
             if ((maximumQueueDepthBytes >= 0L && queueDepthBytes > maximumQueueDepthBytes) ||
@@ -222,7 +207,7 @@ public class ProducerFlowControlOverflow
         {
             if (maximumQueueDepth >= 0)
             {
-                return (long) (_queueFlowResumeLimit / 100.0 * maximumQueueDepth);
+                return (long) Math.ceil(_queueFlowResumeLimit / 100.0 * maximumQueueDepth);
             }
             return -1;
         }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Tue Feb 28 11:33:05 2017
@@ -46,6 +46,6 @@ interface QueueEntryList
 
     void updateStatsOnStateChange(QueueEntry entry, QueueEntry.EntryState fromState, QueueEntry.EntryState toState);
 
-    QueueEntry getLesserOldestEntry();
+    QueueEntry getLeastSignificantOldestEntry();
 
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java Tue Feb 28 11:33:05 2017
@@ -28,7 +28,8 @@ public class RingOverflowPolicyHandler i
     private final Queue<?> _queue;
     private final EventLogger _eventLogger;
 
-    RingOverflowPolicyHandler(Queue<?> queue, final EventLogger eventLogger)
+    RingOverflowPolicyHandler(final Queue<?> queue,
+                              final EventLogger eventLogger)
     {
         _queue = queue;
         _eventLogger = eventLogger;
@@ -59,7 +60,7 @@ public class RingOverflowPolicyHandler i
                     overflow = true;
                 }
 
-                QueueEntry entry = _queue.getLesserOldestEntry();
+                QueueEntry entry = _queue.getLeastSignificantOldestEntry();
 
                 if (entry != null)
                 {

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java Tue Feb 28 11:33:05 2017
@@ -465,25 +465,9 @@ public class SortedQueueEntryList extend
     }
 
     @Override
-    public QueueEntry getLesserOldestEntry()
+    public QueueEntry getLeastSignificantOldestEntry()
     {
-        SortedQueueEntry lastNode = null;
-        QueueEntryIterator iterator = iterator();
-        while (iterator.advance())
-        {
-            QueueEntry node = iterator.getNode();
-            if (node != null && !node.isDeleted())
-            {
-                SortedQueueEntry sortedQueueEntry = (SortedQueueEntry)node;
-                if (lastNode == null
-                    || (lastNode.getKey() != null && !lastNode.getKey().equals(sortedQueueEntry.getKey()))
-                    || (lastNode.getKey() == null && sortedQueueEntry.getKey() != null) )
-                {
-                    lastNode = sortedQueueEntry;
-                }
-            }
-        }
-        return lastNode;
+        return getOldestEntry();
     }
 
     /**

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java Tue Feb 28 11:33:05 2017
@@ -49,7 +49,7 @@ public class StandardQueueEntryList exte
 
 
     @Override
-    public QueueEntry getLesserOldestEntry()
+    public QueueEntry getLeastSignificantOldestEntry()
     {
         return getOldestEntry();
     }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java Tue Feb 28 11:33:05 2017
@@ -637,7 +637,7 @@ public class VirtualHostStoreUpgraderAnd
                             }
                             contextMap.put("queue.queueFlowResumeLimit", flowResumeLimit);
                         }
-                        attributes.put("overflowPolicy", "ProducerFlowControl");
+                        attributes.put("overflowPolicy", "PRODUCER_FLOW_CONTROL");
                         attributes.put("maximumQueueDepthBytes", queueFlowControlSizeBytes);
                     }
                 }

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java Tue Feb 28 11:33:05 2017
@@ -212,16 +212,16 @@ public class LastValueQueueListTest exte
         LastValueQueueList queueEntryList = new LastValueQueueList(_queue, _queue.getQueueStatistics());
 
         QueueEntry entry1 =  queueEntryList.add(createTestServerMessage(TEST_KEY_VALUE1), null);
-        assertEquals("Unexpected last message", entry1, queueEntryList.getLesserOldestEntry());
+        assertEquals("Unexpected last message", entry1, queueEntryList.getLeastSignificantOldestEntry());
 
         QueueEntry entry2 =  queueEntryList.add(createTestServerMessage(TEST_KEY_VALUE2), null);
-        assertEquals("Unexpected last message", entry1,  queueEntryList.getLesserOldestEntry());
+        assertEquals("Unexpected last message", entry1,  queueEntryList.getLeastSignificantOldestEntry());
 
         QueueEntry entry3 =  queueEntryList.add(createTestServerMessage(TEST_KEY_VALUE1), null);
-        assertEquals("Unexpected last message", entry2,  queueEntryList.getLesserOldestEntry());
+        assertEquals("Unexpected last message", entry2,  queueEntryList.getLeastSignificantOldestEntry());
 
         queueEntryList.add(createTestServerMessage(TEST_KEY_VALUE2), null);
-        assertEquals("Unexpected last message", entry3,  queueEntryList.getLesserOldestEntry());
+        assertEquals("Unexpected last message", entry3,  queueEntryList.getLeastSignificantOldestEntry());
     }
 
     private int countEntries(LastValueQueueList list)

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java Tue Feb 28 11:33:05 2017
@@ -163,9 +163,9 @@ public class PriorityQueueListTest exten
                 1, _priority4message2.compareTo(_priority5message2));
     }
 
-    public void testGetLesserOldestEntry()
+    public void testGetLeastSignificantOldestEntry()
     {
-        assertEquals("Unexpected last entry", _priority4message1, _list.getLesserOldestEntry());
+        assertEquals("Unexpected last entry", _priority4message1, _list.getLeastSignificantOldestEntry());
 
         ServerMessage<?> message = mock(ServerMessage.class);
         AMQMessageHeader header = mock(AMQMessageHeader.class);
@@ -180,6 +180,6 @@ public class PriorityQueueListTest exten
 
         QueueEntry newEntry = _list.add(message, null);
 
-        assertEquals("Unexpected last entry", newEntry, _list.getLesserOldestEntry());
+        assertEquals("Unexpected last entry", newEntry, _list.getLeastSignificantOldestEntry());
     }
 }

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java Tue Feb 28 11:33:05 2017
@@ -64,7 +64,7 @@ public class ProducerFlowControlOverflow
         when(_queue.getMaximumQueueDepthMessages()).thenReturn(-1L);
         when(_queue.getOverflowPolicy()).thenReturn(OverflowPolicy.PRODUCER_FLOW_CONTROL);
         when(_queue.getContextValue(Double.class, Queue.QUEUE_FLOW_RESUME_LIMIT)).thenReturn(80.0);
-        when(_queue.getQueueDepthBytes()).thenReturn(0L);
+        when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(0L);
         when(_queue.getQueueDepthMessages()).thenReturn(0);
         when(_queue.getLogSubject()).thenReturn(_subject);
 
@@ -74,7 +74,7 @@ public class ProducerFlowControlOverflow
     public void testCheckOverflowBlocksSessionWhenOverfullBytes() throws Exception
     {
         AMQPSession<?, ?> session = mock(AMQPSession.class);
-        when(_queue.getQueueDepthBytes()).thenReturn(11L);
+        when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(11L);
         when(_queue.getMaximumQueueDepthBytes()).thenReturn(10L);
 
         checkOverflow(session);
@@ -105,7 +105,7 @@ public class ProducerFlowControlOverflow
     {
         assertFalse("Flow should not be stopped", _producerFlowControlOverflowPolicyHandler.isQueueFlowStopped());
 
-        when(_queue.getQueueDepthBytes()).thenReturn(11L);
+        when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(11L);
         when(_queue.getMaximumQueueDepthBytes()).thenReturn(10L);
 
         checkOverflow(mock(AMQPSession.class));
@@ -116,7 +116,7 @@ public class ProducerFlowControlOverflow
     public void testCheckOverflowResumesFlowWhenUnderfullBytes() throws Exception
     {
         AMQPSession<?, ?> session = mock(AMQPSession.class);
-        when(_queue.getQueueDepthBytes()).thenReturn(11L);
+        when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(11L);
         when(_queue.getMaximumQueueDepthBytes()).thenReturn(10L);
 
         checkOverflow(session);
@@ -126,7 +126,7 @@ public class ProducerFlowControlOverflow
         verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(overfullMessage)));
         assertTrue("Flow should be stopped", _producerFlowControlOverflowPolicyHandler.isQueueFlowStopped());
 
-        when(_queue.getQueueDepthBytes()).thenReturn(8L);
+        when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(8L);
 
         _producerFlowControlOverflowPolicyHandler.checkOverflow();
 

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java Tue Feb 28 11:33:05 2017
@@ -67,7 +67,7 @@ public class RingOverflowPolicyHandlerTe
     public void testCheckOverflowWhenOverfullBytes() throws Exception
     {
         QueueEntry lastEntry = createLastEntry();
-        when(_queue.getLesserOldestEntry()).thenReturn(lastEntry, (QueueEntry) null);
+        when(_queue.getLeastSignificantOldestEntry()).thenReturn(lastEntry, (QueueEntry) null);
         when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(10L, 4L);
         when(_queue.getMaximumQueueDepthBytes()).thenReturn(5L);
         when(_queue.getQueueDepthMessages()).thenReturn(3, 1);
@@ -83,7 +83,7 @@ public class RingOverflowPolicyHandlerTe
     public void testCheckOverflowWhenOverfullMessages() throws Exception
     {
         QueueEntry lastEntry = createLastEntry();
-        when(_queue.getLesserOldestEntry()).thenReturn(lastEntry, (QueueEntry) null);
+        when(_queue.getLeastSignificantOldestEntry()).thenReturn(lastEntry, (QueueEntry) null);
         when(_queue.getQueueDepthMessages()).thenReturn(10, 5);
         when(_queue.getMaximumQueueDepthMessages()).thenReturn(5L);
         when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(10L, 4L);

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java Tue Feb 28 11:33:05 2017
@@ -421,21 +421,21 @@ public class SortedQueueEntryListTest ex
         validateEntry(entry, "D", 2);
     }
 
-    public void testGetLesserOldestEntry()
+    public void testGetLeastSignificantOldestEntry()
     {
         SortedQueueEntryList list = new SortedQueueEntryList(_testQueue, _testQueue.getQueueStatistics());
 
         SortedQueueEntry entry1 = list.add(generateTestMessage(1, "B"), null);
-        assertEquals("Unexpected last entry", entry1, list.getLesserOldestEntry());
+        assertEquals("Unexpected last entry", entry1, list.getLeastSignificantOldestEntry());
 
-        SortedQueueEntry entry2 = list.add(generateTestMessage(2, "C"), null);
-        assertEquals("Unexpected last entry", entry2, list.getLesserOldestEntry());
+        list.add(generateTestMessage(2, "C"), null);
+        assertEquals("Unexpected last entry", entry1, list.getLeastSignificantOldestEntry());
 
         list.add(generateTestMessage(3, null), null);
-        assertEquals("Unexpected last entry", entry2, list.getLesserOldestEntry());
+        assertEquals("Unexpected last entry", entry1, list.getLeastSignificantOldestEntry());
 
         list.add(generateTestMessage(4, "A"), null);
-        assertEquals("Unexpected last entry", entry2, list.getLesserOldestEntry());
+        assertEquals("Unexpected last entry", entry1, list.getLeastSignificantOldestEntry());
     }
 
     private void validateEntry(final SortedQueueEntry entry, final String expectedSortKey, final long expectedMessageId)

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java Tue Feb 28 11:33:05 2017
@@ -282,13 +282,13 @@ public class StandardQueueEntryListTest
         StandardQueueEntryList queueEntryList = new StandardQueueEntryList(_testQueue, _testQueue.getQueueStatistics());
 
         QueueEntry entry1 =  queueEntryList.add(createServerMessage(1), null);
-        assertEquals("Unexpected last message", entry1, queueEntryList.getLesserOldestEntry());
+        assertEquals("Unexpected last message", entry1, queueEntryList.getLeastSignificantOldestEntry());
 
         queueEntryList.add(createServerMessage(2), null);
-        assertEquals("Unexpected last message", entry1,  queueEntryList.getLesserOldestEntry());
+        assertEquals("Unexpected last message", entry1,  queueEntryList.getLeastSignificantOldestEntry());
 
         queueEntryList.add(createServerMessage(3), null);
-        assertEquals("Unexpected last message", entry1,  queueEntryList.getLesserOldestEntry());
+        assertEquals("Unexpected last message", entry1,  queueEntryList.getLeastSignificantOldestEntry());
     }
 
     private ServerMessage createServerMessage(final long id)

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Tue Feb 28 11:33:05 2017
@@ -241,7 +241,7 @@ public class StandardQueueTest extends A
 
 
         @Override
-        public QueueEntry getLesserOldestEntry()
+        public QueueEntry getLeastSignificantOldestEntry()
         {
             return getOldestEntry();
         }

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java Tue Feb 28 11:33:05 2017
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.qpid.server.model.OverflowPolicy;
 import org.apache.qpid.server.model.VirtualHostNode;
 import org.apache.qpid.test.utils.QpidTestCase;
 
@@ -76,6 +77,10 @@ public class VirtualHostStoreUpgraderAnd
         assertEquals("Unexpected queue.queueFlowResumeLimit",
                      "70.00",
                      ((Map<String, String>) upgradedAttributes.get("context")).get("queue.queueFlowResumeLimit"));
+
+        assertEquals("Unexpected overflowPolicy",
+                     OverflowPolicy.PRODUCER_FLOW_CONTROL.name(),
+                     String.valueOf(upgradedAttributes.get("overflowPolicy")));
     }
 
     private ConfiguredObjectRecord findRecordById(UUID id, List<ConfiguredObjectRecord> records)

Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/common/util.js
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/common/util.js?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/common/util.js (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/common/util.js Tue Feb 28 11:33:05 2017
@@ -603,16 +603,16 @@ define(["dojo/_base/xhr",
         util.applyMetadataToWidgets = function (domRoot, category, type, meta)
         {
             this.applyToWidgets(domRoot, category, type, null, meta);
-        }
+        };
 
-        util.applyToWidgets = function (domRoot, category, type, data, meta)
+        util.applyToWidgets = function (domRoot, category, type, data, meta, effectiveData)
         {
             var widgets = util.findAllWidgets(domRoot);
             array.forEach(widgets, function (widget)
             {
-                widgetconfigurer.config(widget, category, type, data, meta);
+                widgetconfigurer.config(widget, category, type, data, meta, effectiveData);
             });
-        }
+        };
 
         util.disableWidgetsForImmutableFields = function (domRoot, category, type, meta)
         {
@@ -675,6 +675,13 @@ define(["dojo/_base/xhr",
                         }
                         else
                         {
+                             if (widget.hasOwnProperty("effectiveDefaultValue") &&
+                                 value === widget.effectiveDefaultValue && initialData &&
+                                 (initialData[propName] === null || initialData[propName] === undefined))
+                             {
+                                 // widget value is effective default value, thus, skipping it...
+                                 continue;
+                             }
                             values[propName] = value ? value : null;
                         }
                     }

Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/common/widgetconfigurer.js
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/common/widgetconfigurer.js?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/common/widgetconfigurer.js (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/common/widgetconfigurer.js Tue Feb 28 11:33:05 2017
@@ -98,7 +98,7 @@ define(["dojo/_base/xhr",
                 }
             }
         },
-        _processWidgetValue: function (widget, category, type, data, meta)
+        _processWidgetValue: function (widget, category, type, data, meta, effectiveData)
         {
             var widgetName = widget.name;
             if (widgetName)
@@ -121,7 +121,22 @@ define(["dojo/_base/xhr",
 
                 if (widget instanceof dijit.form.FilteringSelect || widget instanceof dojox.form.CheckedMultiSelect)
                 {
-                    var widgetValue = dataValue == null ? defaultValue : dataValue;
+                    var widgetValue = dataValue;
+                    if ((dataValue === null || dataValue === undefined)
+                        && defaultValue !== null && defaultValue !== undefined)
+                    {
+                        if (new String(defaultValue).indexOf("${") == -1)
+                        {
+                            widgetValue = defaultValue;
+                        }
+                        else if (effectiveData &&  effectiveData.hasOwnProperty(widgetName)
+                                 && effectiveData[widgetName] !== null && effectiveData[widgetName] !== undefined)
+                        {
+                            widgetValue =  effectiveData[widgetName];
+                            widget.effectiveDefaultValue = widgetValue;
+                        }
+                    }
+
                     if (widgetValue)
                     {
                         widget.set("value", widgetValue);
@@ -141,12 +156,12 @@ define(["dojo/_base/xhr",
                 }
             }
         },
-        config: function (widget, category, type, data, meta)
+        config: function (widget, category, type, data, meta, effectiveData)
         {
             this._processWidgetPrompt(widget, category, type, meta);
             if (data != null)
             {
-                this._processWidgetValue(widget, category, type, data, meta);
+                this._processWidgetValue(widget, category, type, data, meta, effectiveData);
             }
         },
         disableIfImmutable: function (widget, category, type, meta)

Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js Tue Feb 28 11:33:05 2017
@@ -458,6 +458,19 @@ define(["dojo/_base/declare",
 
         }
 
+        function renderMaximumQueueDepth(value)
+        {
+            if (util.isInteger(value))
+            {
+                if (value < 0)
+                {
+                    return "&lt;unlimited&gt;";
+                }
+                return  new String(value);
+            }
+            return "";
+        }
+
         QueueUpdater.prototype.updateHeader = function ()
         {
 
@@ -502,9 +515,8 @@ define(["dojo/_base/declare",
             }
 
             this["overflowPolicy"].innerHTML = entities.encode(this.queueData["overflowPolicy"]);
-            this["maximumQueueDepthBytes"].innerHTML = this.queueData["maximumQueueDepthBytes"] ? entities.encode(new String(this.queueData["maximumQueueDepthBytes"])) : "";
-            this["maximumQueueDepthMessages"].innerHTML = this.queueData["maximumQueueDepthMessages"]  ? entities.encode(new String(this.queueData["maximumQueueDepthMessages"])) : "";
-
+            this["maximumQueueDepthBytes"].innerHTML = renderMaximumQueueDepth(this.queueData.maximumQueueDepthBytes);
+            this["maximumQueueDepthMessages"].innerHTML = renderMaximumQueueDepth(this.queueData.maximumQueueDepthMessages);
             if (this.queueData["messageGroupKey"])
             {
                 this.messageGroupKey.innerHTML = entities.encode(String(this.queueData["messageGroupKey"]));

Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/editQueue.js
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/editQueue.js?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/editQueue.js (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/editQueue.js Tue Feb 28 11:33:05 2017
@@ -152,7 +152,9 @@ define(["dojox/html/entities",
                     "Queue",
                     data.actual.type,
                     data.actual,
-                    this.management.metadata);
+                    this.management.metadata,
+                    data.effective
+                );
 
                 this.context.setData(data.actual.context, data.effective.context, data.inheritedActual.context);
 

Modified: qpid/java/trunk/doc/java-broker/src/docbkx/concepts/Java-Broker-Concepts-Queues.xml
URL: http://svn.apache.org/viewvc/qpid/java/trunk/doc/java-broker/src/docbkx/concepts/Java-Broker-Concepts-Queues.xml?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/doc/java-broker/src/docbkx/concepts/Java-Broker-Concepts-Queues.xml (original)
+++ qpid/java/trunk/doc/java-broker/src/docbkx/concepts/Java-Broker-Concepts-Queues.xml Tue Feb 28 11:33:05 2017
@@ -400,9 +400,7 @@ amqp://guest:guest@client1/development?m
                         <emphasis>Ring</emphasis>
                         - The oldest messages are removed. For a
                         <link linkend="Java-Broker-Concepts-Queues-Types-Priority">Priority Queue</link>
-                        oldest messages with lowest priorities are removed. For a
-                        <link linkend="Java-Broker-Concepts-Queues-Types-Sorted">Sorted Queue</link>
-                        oldest messages with lowest keys are removed.
+                        oldest messages with lowest priorities are removed.
                     </para>
                 </listitem>
                 <listitem>

Modified: qpid/java/trunk/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverydelay/DeliveryDelayTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverydelay/DeliveryDelayTest.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverydelay/DeliveryDelayTest.java (original)
+++ qpid/java/trunk/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverydelay/DeliveryDelayTest.java Tue Feb 28 11:33:05 2017
@@ -33,6 +33,7 @@ import javax.jms.InvalidDestinationRunti
 import javax.jms.JMSConsumer;
 import javax.jms.JMSContext;
 import javax.jms.JMSProducer;
+import javax.jms.JMSRuntimeException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
 import javax.jms.Session;
@@ -102,9 +103,10 @@ public class DeliveryDelayTest extends Q
                 producer.send(queue, "message");
                 fail("Exception not thrown");
             }
-            catch (InvalidDestinationRuntimeException e)
+            catch (JMSRuntimeException e)
             {
-                // PASS
+                assertTrue("Unexpected exception message: " + e.getMessage(),
+                           e.getMessage().contains("amqp:precondition-failed"));
             }
         }
     }
@@ -142,9 +144,10 @@ public class DeliveryDelayTest extends Q
                 producer.send(publishDest, "message with delivery delay");
                 fail("Exception not thrown");
             }
-            catch (InvalidDestinationRuntimeException e)
+            catch (JMSRuntimeException e)
             {
-                // PASS
+                assertTrue("Unexpected exception message: " + e.getMessage(),
+                           e.getMessage().contains("amqp:precondition-failed"));
             }
         }
     }

Modified: qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java (original)
+++ qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java Tue Feb 28 11:33:05 2017
@@ -22,6 +22,7 @@ package org.apache.qpid.test.utils;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -170,6 +171,71 @@ public class AmqpManagementFacade
         }
     }
 
+    public Map<String, Object> readEntityUsingAmqpManagement(final Session session,
+                                                             final String type,
+                                                             final String name,
+                                                             final boolean actuals) throws JMSException
+    {
+        MessageProducer producer = session.createProducer(session.createQueue(_qpidBrokerTestCase.isBroker10()
+                                                                                      ? "$management"
+                                                                                      : "ADDR:$management"));
+
+        final TemporaryQueue responseQueue = session.createTemporaryQueue();
+        MessageConsumer consumer = session.createConsumer(responseQueue);
+
+        MapMessage request = session.createMapMessage();
+        request.setStringProperty("type", type);
+        request.setStringProperty("operation", "READ");
+        request.setString("name", name);
+        request.setString("object-path", name);
+        request.setStringProperty("index", "object-path");
+        request.setStringProperty("key", name);
+        request.setBooleanProperty("actuals", actuals);
+        request.setJMSReplyTo(responseQueue);
+
+        producer.send(request);
+        if (session.getTransacted())
+        {
+            session.commit();
+        }
+
+        Message response = consumer.receive(5000);
+        if (session.getTransacted())
+        {
+            session.commit();
+        }
+        try
+        {
+            if (response instanceof MapMessage)
+            {
+                MapMessage bodyMap = (MapMessage) response;
+                Map<String, Object> data = new HashMap<>();
+                Enumeration<String> keys = bodyMap.getMapNames();
+                while (keys.hasMoreElements())
+                {
+                    String key = keys.nextElement();
+                    data.put(key, bodyMap.getObject(key));
+                }
+                return data;
+            }
+            else if (response instanceof ObjectMessage)
+            {
+                Object body = ((ObjectMessage) response).getObject();
+                if (body instanceof Map)
+                {
+                    Map<String, ?> bodyMap = (Map<String, ?>) body;
+                    return new HashMap<>(bodyMap);
+                }
+            }
+            throw new IllegalArgumentException("Cannot parse the results from a management read");
+        }
+        finally
+        {
+            consumer.close();
+            responseQueue.delete();
+        }
+    }
+
     private List<Map<String, Object>> getResultsAsMap(final List<String> attributeNames, final List<List<Object>> attributeValues)
     {
         List<Map<String, Object>> results = new ArrayList<>();

Modified: qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java (original)
+++ qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java Tue Feb 28 11:33:05 2017
@@ -373,6 +373,11 @@ public class QpidBrokerTestCase extends
         return _managementFacade.managementQueryObjects(session, type);
     }
 
+    protected Map<String, Object> managementReadObject(Session session, String type, String name, boolean actuals) throws JMSException
+    {
+        return _managementFacade.readEntityUsingAmqpManagement(session, type, name, actuals);
+    }
+
     public long getQueueDepth(final Connection con, final Queue destination) throws Exception
     {
         return _jmsProvider.getQueueDepth(con, destination);

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java Tue Feb 28 11:33:05 2017
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.BytesMessage;
@@ -64,6 +65,8 @@ public class ProducerFlowControlTest ext
     private RestTestHelper _restTestHelper;
 
     private final AtomicInteger _sentMessages = new AtomicInteger(0);
+    private int _messageSizeIncludingHeader;
+    private Session _utilitySession;
 
     public void setUp() throws Exception
     {
@@ -73,6 +76,11 @@ public class ProducerFlowControlTest ext
         _restTestHelper = new RestTestHelper(getDefaultBroker().getHttpPort());
         _monitor.markDiscardPoint();
 
+        if (!isBroker10())
+        {
+            setSystemProperty("sync_publish", "all");
+        }
+
         _producerConnection = getConnection();
         _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
@@ -81,6 +89,16 @@ public class ProducerFlowControlTest ext
         _consumerConnection = getConnection();
         _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
+        final Connection utilityConnection = getConnection();
+        utilityConnection.start();
+        _utilitySession = utilityConnection.createSession(true, Session.SESSION_TRANSACTED);
+        String tmpQueueName = getTestQueueName() + "_Tmp";
+        Queue tmpQueue = createTestQueue(_utilitySession, tmpQueueName);
+        MessageProducer  tmpQueueProducer= _utilitySession.createProducer(tmpQueue);
+        tmpQueueProducer.send(nextMessage(0, _utilitySession));
+        _utilitySession.commit();
+
+        _messageSizeIncludingHeader = getQueueDepthBytes(tmpQueueName);
     }
 
     public void tearDown() throws Exception
@@ -107,45 +125,46 @@ public class ProducerFlowControlTest ext
     {
         String queueName = getTestQueueName();
 
-        createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, 1000, 800);
+        int capacity = _messageSizeIncludingHeader * 3 + _messageSizeIncludingHeader / 2;
+        int resumeCapacity = _messageSizeIncludingHeader * 2;
+        createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, capacity, resumeCapacity);
         _producer = _producerSession.createProducer(_queue);
 
         // try to send 5 messages (should block after 4)
-        sendMessagesAsync(_producer, _producerSession, 5, 50L);
-
-        Thread.sleep(5000);
+        CountDownLatch sendLatch = sendMessagesAsync(_producer, _producerSession, 5, 5L).getSendLatch();
 
+        assertTrue("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", true, 5000));
         assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get());
 
         _consumer = _consumerSession.createConsumer(_queue);
         _consumerConnection.start();
 
+        Message message = _consumer.receive(RECEIVE_TIMEOUT);
+        assertNotNull("Message is not received", message);
 
-        _consumer.receive();
-
-        Thread.sleep(1000);
+        assertFalse("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", false, 1000));
 
         assertEquals("Message incorrectly sent after one message received", 4, _sentMessages.get());
 
-
-        _consumer.receive();
-
-        Thread.sleep(1000);
-
+        Message message2 = _consumer.receive(RECEIVE_TIMEOUT);
+        assertNotNull("Message is not received", message2);
+        assertTrue("Message sending is not finished", sendLatch.await(1000, TimeUnit.MILLISECONDS));
         assertEquals("Message not sent after two messages received", 5, _sentMessages.get());
-
     }
 
 
     public void testBrokerLogMessages() throws Exception
     {
         String queueName = getTestQueueName();
-        
-        createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, 1000, 800);
+
+        int capacity = _messageSizeIncludingHeader * 3 + _messageSizeIncludingHeader / 2;
+        int resumeCapacity = _messageSizeIncludingHeader * 2;
+
+        createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, capacity, resumeCapacity);
         _producer = _producerSession.createProducer(_queue);
 
         // try to send 5 messages (should block after 4)
-        sendMessagesAsync(_producer, _producerSession, 5, 50L);
+        sendMessagesAsync(_producer, _producerSession, 5, 5L);
 
         List<String> results = waitAndFindMatches("QUE-1003", 7000);
 
@@ -165,24 +184,28 @@ public class ProducerFlowControlTest ext
     public void testFlowControlOnCapacityResumeEqual() throws Exception
     {
         String queueName = getTestQueueName();
-        
-        createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, 1000, 1000);
+
+        int capacity = _messageSizeIncludingHeader * 3 + _messageSizeIncludingHeader / 2;
+        createAndBindQueueWithFlowControlEnabled(_producerSession, queueName,
+                                                 capacity,
+                                                 capacity);
         _producer = _producerSession.createProducer(_queue);
 
 
         // try to send 5 messages (should block after 4)
-        sendMessagesAsync(_producer, _producerSession, 5, 50L);
+        CountDownLatch sendLatch = sendMessagesAsync(_producer, _producerSession, 5, 5L).getSendLatch();
 
-        Thread.sleep(5000);
+        assertTrue("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", true,5000));
 
         assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get());
 
         _consumer = _consumerSession.createConsumer(_queue);
         _consumerConnection.start();
 
-        _consumer.receive();
+        Message message = _consumer.receive(RECEIVE_TIMEOUT);
+        assertNotNull("Message is not received", message);
 
-        Thread.sleep(1000);
+        assertTrue("Message sending is not finished", sendLatch.await(1000, TimeUnit.MILLISECONDS));
 
         assertEquals("Message incorrectly sent after one message received", 5, _sentMessages.get());
         
@@ -198,7 +221,9 @@ public class ProducerFlowControlTest ext
         final int numProducers = 10;
         final int numMessages = 100;
 
-        createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, 6000, 3000);
+        final int capacity = _messageSizeIncludingHeader * 20;
+
+        createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, capacity, capacity/2);
 
         _consumerConnection.start();
 
@@ -211,7 +236,7 @@ public class ProducerFlowControlTest ext
             Session session = producers[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
 
             MessageProducer myproducer = session.createProducer(_queue);
-            MessageSender sender = sendMessagesAsync(myproducer, session, numMessages, 50L);
+            MessageSender sender = sendMessagesAsync(myproducer, session, numMessages, 5L);
         }
 
         _consumer = _consumerSession.createConsumer(_queue);
@@ -221,7 +246,6 @@ public class ProducerFlowControlTest ext
         {
         
             Message msg = _consumer.receive(5000);
-            Thread.sleep(50L);
             assertNotNull("Message not received("+j+"), sent: "+_sentMessages.get(), msg);
 
         }
@@ -301,9 +325,9 @@ public class ProducerFlowControlTest ext
     private int getQueueDepthBytes(final String queueName) throws IOException
     {
         // On AMQP 1.0 the size of the message on the broker is not necessarily the size of the message we sent. Therefore, get the actual size from the broker
-        final String requestUrl = String.format("queue/%1$s/%1$s/%2$s/getStatistics?statistics=[\"queueDepthBytes\"]", TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, queueName);
+        final String requestUrl = String.format("queue/%1$s/%1$s/%2$s/getStatistics?statistics=[\"queueDepthBytesIncludingHeader\"]", TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, queueName);
         final Map<String, Object> queueAttributes = _restTestHelper.getJsonAsMap(requestUrl);
-        return ((Number) queueAttributes.get("queueDepthBytes")).intValue();
+        return ((Number) queueAttributes.get("queueDepthBytesIncludingHeader")).intValue();
     }
 
     private void waitForFlowControlAndMessageCount(final String queueUrl, final int messageCount, final int timeout) throws InterruptedException, IOException
@@ -341,29 +365,28 @@ public class ProducerFlowControlTest ext
     public void testQueueDeleteWithBlockedFlow() throws Exception
     {
         String queueName = getTestQueueName();
-        createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, 1000, 800, true, false);
+        int capacity = _messageSizeIncludingHeader * 3 + _messageSizeIncludingHeader / 2;
+        int resumeCapacity = _messageSizeIncludingHeader * 2;
+        createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, capacity, resumeCapacity, true, false);
 
         _producer = _producerSession.createProducer(_queue);
 
         // try to send 5 messages (should block after 4)
-        sendMessagesAsync(_producer, _producerSession, 5, 50L);
+        sendMessagesAsync(_producer, _producerSession, 5, 5L);
 
-        Thread.sleep(5000);
+        assertTrue("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", true,5000));
 
         assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get());
 
-        // close blocked producer session and connection
-        _producerConnection.close();
-
         if(!isBroker10())
         {
             // delete queue with a consumer session
-            ((AMQSession<?, ?>) _consumerSession).sendQueueDelete(queueName);
+            ((AMQSession<?, ?>) _utilitySession).sendQueueDelete(queueName);
         }
         else
         {
-            deleteEntityUsingAmqpManagement(getTestQueueName(), _consumerSession, "org.apache.qpid.Queue");
-            createTestQueue(_consumerSession);
+            deleteEntityUsingAmqpManagement(getTestQueueName(), _utilitySession, "org.apache.qpid.Queue");
+            createTestQueue(_utilitySession);
         }
         _consumer = _consumerSession.createConsumer(_queue);
         _consumerConnection.start();
@@ -432,7 +455,7 @@ public class ProducerFlowControlTest ext
         private final Session _senderSession;
         private final int _numMessages;
         private volatile JMSException _exception;
-        private CountDownLatch _exceptionThrownLatch = new CountDownLatch(1);
+        private CountDownLatch _sendLatch = new CountDownLatch(1);
         private long _sleepPeriod;
 
         public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
@@ -452,10 +475,18 @@ public class ProducerFlowControlTest ext
             catch (JMSException e)
             {
                 _exception = e;
-                _exceptionThrownLatch.countDown();
+            }
+            finally
+            {
+                _sendLatch.countDown();
             }
         }
 
+        public CountDownLatch getSendLatch()
+        {
+            return _sendLatch;
+        }
+
         private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
                 throws JMSException
         {
@@ -500,15 +531,50 @@ public class ProducerFlowControlTest ext
             }
         }
 
-        private final byte[] BYTE_300 = new byte[300];
+    }
+
+    private final byte[] BYTE_300 = new byte[300];
 
-        private Message nextMessage(int msg, Session producerSession) throws JMSException
+    private Message nextMessage(int msg, Session producerSession) throws JMSException
+    {
+        BytesMessage send = producerSession.createBytesMessage();
+        send.writeBytes(BYTE_300);
+        send.setIntProperty("msg", msg);
+        return send;
+    }
+
+    private boolean awaitAttributeValue(String queueName, String attributeName, Object expectedValue, long timeout)
+            throws JMSException, InterruptedException
+    {
+        long startTime = System.currentTimeMillis();
+        long endTime = startTime + timeout;
+        boolean found = false;
+        do
         {
-            BytesMessage send = producerSession.createBytesMessage();
-            send.writeBytes(BYTE_300);
-            send.setIntProperty("msg", msg);
+            Map<String, Object> attributes =
+                    managementReadObject(_utilitySession, "org.apache.qpid.SortedQueue", queueName, false);
+            Object actualValue = attributes.get(attributeName);
+            if (expectedValue == null)
+            {
+                found = actualValue == null;
+            }
+            else if (actualValue != null)
+            {
+                if (actualValue.getClass() == expectedValue.getClass())
+                {
+                    found = expectedValue.equals(actualValue);
+                }
+                else
+                {
+                    found = String.valueOf(expectedValue).equals(String.valueOf(actualValue));
+                }
+            }
 
-            return send;
-        }
+            if (!found)
+            {
+                Thread.sleep(50);
+            }
+        } while (!found && System.currentTimeMillis() <= endTime);
+        return found;
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org