You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/02/28 11:33:05 UTC
svn commit: r1784725 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-core/src/main/java/org/apache/qpid/server/store/
broker-core/src/test/java/org/apache/qpi...
Author: orudyy
Date: Tue Feb 28 11:33:05 2017
New Revision: 1784725
URL: http://svn.apache.org/viewvc?rev=1784725&view=rev
Log:
QPID-7618: Address review comments from Keith Wall and fix remaining issues
* Fix upgrader functionality to use correct overlow policy name
for 'producer flow control'
* Use attribute 'queueDepthBytesIncludingHeader' instead of attribute
'queueDepthBytes' in producer flow control handler
* Fix UI for queue editing to stop displaying error message
when overflow policy is not set explicitly
* On queue UI display 'unlimited' for negative queue depth limit values
* Rename methods #getLesserOldestEntry into #getLeastSignificantOldestEntry
* Change SortedQueueEntryList to return the oldest entry from
QueueEntryList#getLeastSignificantOldestEntry
* Remove redundant code in
ProducerFlowControlOverflowPolicyHandler.Handler#bulkChangeEnd
mistakenly added in one of previous commits
* Fix failing tests from test suite DeliveryDelayTest
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/common/util.js
qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/common/widgetconfigurer.js
qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/editQueue.js
qpid/java/trunk/doc/java-broker/src/docbkx/concepts/Java-Broker-Concepts-Queues.xml
qpid/java/trunk/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverydelay/DeliveryDelayTest.java
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Tue Feb 28 11:33:05 2017
@@ -489,6 +489,6 @@ public interface Queue<X extends Queue<X
void deleteEntry(QueueEntry entry);
- QueueEntry getLesserOldestEntry();
+ QueueEntry getLeastSignificantOldestEntry();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Feb 28 11:33:05 2017
@@ -3166,9 +3166,9 @@ public abstract class AbstractQueue<X ex
}
@Override
- public QueueEntry getLesserOldestEntry()
+ public QueueEntry getLeastSignificantOldestEntry()
{
- return getEntries().getLesserOldestEntry();
+ return getEntries().getLeastSignificantOldestEntry();
}
private class MessageFinder implements QueueEntryVisitor
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java Tue Feb 28 11:33:05 2017
@@ -147,7 +147,7 @@ public class LastValueQueueList extends
}
@Override
- public QueueEntry getLesserOldestEntry()
+ public QueueEntry getLeastSignificantOldestEntry()
{
return getOldestEntry();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Tue Feb 28 11:33:05 2017
@@ -216,11 +216,11 @@ abstract public class PriorityQueueList
}
@Override
- public QueueEntry getLesserOldestEntry()
+ public QueueEntry getLeastSignificantOldestEntry()
{
for(PriorityQueueEntrySubList subList : _priorityLists)
{
- QueueEntry subListLast = subList.getLesserOldestEntry();
+ QueueEntry subListLast = subList.getLeastSignificantOldestEntry();
if(subListLast != null)
{
return subListLast;
@@ -261,7 +261,7 @@ abstract public class PriorityQueueList
}
@Override
- public QueueEntry getLesserOldestEntry()
+ public QueueEntry getLeastSignificantOldestEntry()
{
return getOldestEntry();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java Tue Feb 28 11:33:05 2017
@@ -131,21 +131,6 @@ public class ProducerFlowControlOverflow
{
_queue.removeChangeListener(this);
checkUnderfull(-1, -1);
-
- if (_overfullReported.compareAndSet(true, false))
- {
- _eventLogger.message(_queue.getLogSubject(),
- QueueMessages.UNDERFULL(_queue.getQueueDepthBytes(),
- getFlowResumeLimit((long) -1),
- (long) _queue.getQueueDepthMessages(),
- getFlowResumeLimit((long) -1)));
- }
-
- for (final AMQPSession<?, ?> blockedSession : _blockedSessions)
- {
- blockedSession.unblock(_queue);
- _blockedSessions.remove(blockedSession);
- }
}
}
@@ -156,7 +141,7 @@ public class ProducerFlowControlOverflow
private void checkUnderfull(long maximumQueueDepthBytes, long maximumQueueDepthMessages)
{
- long queueDepthBytes = _queue.getQueueDepthBytes();
+ long queueDepthBytes = _queue.getQueueDepthBytesIncludingHeader();
long queueDepthMessages = _queue.getQueueDepthMessages();
if (isUnderfull(queueDepthBytes, maximumQueueDepthBytes)
@@ -181,7 +166,7 @@ public class ProducerFlowControlOverflow
private void checkOverfull(final long maximumQueueDepthBytes, final long maximumQueueDepthMessages)
{
- final long queueDepthBytes = _queue.getQueueDepthBytes();
+ final long queueDepthBytes = _queue.getQueueDepthBytesIncludingHeader();
final long queueDepthMessages = _queue.getQueueDepthMessages();
if ((maximumQueueDepthBytes >= 0L && queueDepthBytes > maximumQueueDepthBytes) ||
@@ -222,7 +207,7 @@ public class ProducerFlowControlOverflow
{
if (maximumQueueDepth >= 0)
{
- return (long) (_queueFlowResumeLimit / 100.0 * maximumQueueDepth);
+ return (long) Math.ceil(_queueFlowResumeLimit / 100.0 * maximumQueueDepth);
}
return -1;
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Tue Feb 28 11:33:05 2017
@@ -46,6 +46,6 @@ interface QueueEntryList
void updateStatsOnStateChange(QueueEntry entry, QueueEntry.EntryState fromState, QueueEntry.EntryState toState);
- QueueEntry getLesserOldestEntry();
+ QueueEntry getLeastSignificantOldestEntry();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java Tue Feb 28 11:33:05 2017
@@ -28,7 +28,8 @@ public class RingOverflowPolicyHandler i
private final Queue<?> _queue;
private final EventLogger _eventLogger;
- RingOverflowPolicyHandler(Queue<?> queue, final EventLogger eventLogger)
+ RingOverflowPolicyHandler(final Queue<?> queue,
+ final EventLogger eventLogger)
{
_queue = queue;
_eventLogger = eventLogger;
@@ -59,7 +60,7 @@ public class RingOverflowPolicyHandler i
overflow = true;
}
- QueueEntry entry = _queue.getLesserOldestEntry();
+ QueueEntry entry = _queue.getLeastSignificantOldestEntry();
if (entry != null)
{
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java Tue Feb 28 11:33:05 2017
@@ -465,25 +465,9 @@ public class SortedQueueEntryList extend
}
@Override
- public QueueEntry getLesserOldestEntry()
+ public QueueEntry getLeastSignificantOldestEntry()
{
- SortedQueueEntry lastNode = null;
- QueueEntryIterator iterator = iterator();
- while (iterator.advance())
- {
- QueueEntry node = iterator.getNode();
- if (node != null && !node.isDeleted())
- {
- SortedQueueEntry sortedQueueEntry = (SortedQueueEntry)node;
- if (lastNode == null
- || (lastNode.getKey() != null && !lastNode.getKey().equals(sortedQueueEntry.getKey()))
- || (lastNode.getKey() == null && sortedQueueEntry.getKey() != null) )
- {
- lastNode = sortedQueueEntry;
- }
- }
- }
- return lastNode;
+ return getOldestEntry();
}
/**
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java Tue Feb 28 11:33:05 2017
@@ -49,7 +49,7 @@ public class StandardQueueEntryList exte
@Override
- public QueueEntry getLesserOldestEntry()
+ public QueueEntry getLeastSignificantOldestEntry()
{
return getOldestEntry();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java Tue Feb 28 11:33:05 2017
@@ -637,7 +637,7 @@ public class VirtualHostStoreUpgraderAnd
}
contextMap.put("queue.queueFlowResumeLimit", flowResumeLimit);
}
- attributes.put("overflowPolicy", "ProducerFlowControl");
+ attributes.put("overflowPolicy", "PRODUCER_FLOW_CONTROL");
attributes.put("maximumQueueDepthBytes", queueFlowControlSizeBytes);
}
}
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java Tue Feb 28 11:33:05 2017
@@ -212,16 +212,16 @@ public class LastValueQueueListTest exte
LastValueQueueList queueEntryList = new LastValueQueueList(_queue, _queue.getQueueStatistics());
QueueEntry entry1 = queueEntryList.add(createTestServerMessage(TEST_KEY_VALUE1), null);
- assertEquals("Unexpected last message", entry1, queueEntryList.getLesserOldestEntry());
+ assertEquals("Unexpected last message", entry1, queueEntryList.getLeastSignificantOldestEntry());
QueueEntry entry2 = queueEntryList.add(createTestServerMessage(TEST_KEY_VALUE2), null);
- assertEquals("Unexpected last message", entry1, queueEntryList.getLesserOldestEntry());
+ assertEquals("Unexpected last message", entry1, queueEntryList.getLeastSignificantOldestEntry());
QueueEntry entry3 = queueEntryList.add(createTestServerMessage(TEST_KEY_VALUE1), null);
- assertEquals("Unexpected last message", entry2, queueEntryList.getLesserOldestEntry());
+ assertEquals("Unexpected last message", entry2, queueEntryList.getLeastSignificantOldestEntry());
queueEntryList.add(createTestServerMessage(TEST_KEY_VALUE2), null);
- assertEquals("Unexpected last message", entry3, queueEntryList.getLesserOldestEntry());
+ assertEquals("Unexpected last message", entry3, queueEntryList.getLeastSignificantOldestEntry());
}
private int countEntries(LastValueQueueList list)
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java Tue Feb 28 11:33:05 2017
@@ -163,9 +163,9 @@ public class PriorityQueueListTest exten
1, _priority4message2.compareTo(_priority5message2));
}
- public void testGetLesserOldestEntry()
+ public void testGetLeastSignificantOldestEntry()
{
- assertEquals("Unexpected last entry", _priority4message1, _list.getLesserOldestEntry());
+ assertEquals("Unexpected last entry", _priority4message1, _list.getLeastSignificantOldestEntry());
ServerMessage<?> message = mock(ServerMessage.class);
AMQMessageHeader header = mock(AMQMessageHeader.class);
@@ -180,6 +180,6 @@ public class PriorityQueueListTest exten
QueueEntry newEntry = _list.add(message, null);
- assertEquals("Unexpected last entry", newEntry, _list.getLesserOldestEntry());
+ assertEquals("Unexpected last entry", newEntry, _list.getLeastSignificantOldestEntry());
}
}
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java Tue Feb 28 11:33:05 2017
@@ -64,7 +64,7 @@ public class ProducerFlowControlOverflow
when(_queue.getMaximumQueueDepthMessages()).thenReturn(-1L);
when(_queue.getOverflowPolicy()).thenReturn(OverflowPolicy.PRODUCER_FLOW_CONTROL);
when(_queue.getContextValue(Double.class, Queue.QUEUE_FLOW_RESUME_LIMIT)).thenReturn(80.0);
- when(_queue.getQueueDepthBytes()).thenReturn(0L);
+ when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(0L);
when(_queue.getQueueDepthMessages()).thenReturn(0);
when(_queue.getLogSubject()).thenReturn(_subject);
@@ -74,7 +74,7 @@ public class ProducerFlowControlOverflow
public void testCheckOverflowBlocksSessionWhenOverfullBytes() throws Exception
{
AMQPSession<?, ?> session = mock(AMQPSession.class);
- when(_queue.getQueueDepthBytes()).thenReturn(11L);
+ when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(11L);
when(_queue.getMaximumQueueDepthBytes()).thenReturn(10L);
checkOverflow(session);
@@ -105,7 +105,7 @@ public class ProducerFlowControlOverflow
{
assertFalse("Flow should not be stopped", _producerFlowControlOverflowPolicyHandler.isQueueFlowStopped());
- when(_queue.getQueueDepthBytes()).thenReturn(11L);
+ when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(11L);
when(_queue.getMaximumQueueDepthBytes()).thenReturn(10L);
checkOverflow(mock(AMQPSession.class));
@@ -116,7 +116,7 @@ public class ProducerFlowControlOverflow
public void testCheckOverflowResumesFlowWhenUnderfullBytes() throws Exception
{
AMQPSession<?, ?> session = mock(AMQPSession.class);
- when(_queue.getQueueDepthBytes()).thenReturn(11L);
+ when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(11L);
when(_queue.getMaximumQueueDepthBytes()).thenReturn(10L);
checkOverflow(session);
@@ -126,7 +126,7 @@ public class ProducerFlowControlOverflow
verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(overfullMessage)));
assertTrue("Flow should be stopped", _producerFlowControlOverflowPolicyHandler.isQueueFlowStopped());
- when(_queue.getQueueDepthBytes()).thenReturn(8L);
+ when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(8L);
_producerFlowControlOverflowPolicyHandler.checkOverflow();
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java Tue Feb 28 11:33:05 2017
@@ -67,7 +67,7 @@ public class RingOverflowPolicyHandlerTe
public void testCheckOverflowWhenOverfullBytes() throws Exception
{
QueueEntry lastEntry = createLastEntry();
- when(_queue.getLesserOldestEntry()).thenReturn(lastEntry, (QueueEntry) null);
+ when(_queue.getLeastSignificantOldestEntry()).thenReturn(lastEntry, (QueueEntry) null);
when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(10L, 4L);
when(_queue.getMaximumQueueDepthBytes()).thenReturn(5L);
when(_queue.getQueueDepthMessages()).thenReturn(3, 1);
@@ -83,7 +83,7 @@ public class RingOverflowPolicyHandlerTe
public void testCheckOverflowWhenOverfullMessages() throws Exception
{
QueueEntry lastEntry = createLastEntry();
- when(_queue.getLesserOldestEntry()).thenReturn(lastEntry, (QueueEntry) null);
+ when(_queue.getLeastSignificantOldestEntry()).thenReturn(lastEntry, (QueueEntry) null);
when(_queue.getQueueDepthMessages()).thenReturn(10, 5);
when(_queue.getMaximumQueueDepthMessages()).thenReturn(5L);
when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(10L, 4L);
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java Tue Feb 28 11:33:05 2017
@@ -421,21 +421,21 @@ public class SortedQueueEntryListTest ex
validateEntry(entry, "D", 2);
}
- public void testGetLesserOldestEntry()
+ public void testGetLeastSignificantOldestEntry()
{
SortedQueueEntryList list = new SortedQueueEntryList(_testQueue, _testQueue.getQueueStatistics());
SortedQueueEntry entry1 = list.add(generateTestMessage(1, "B"), null);
- assertEquals("Unexpected last entry", entry1, list.getLesserOldestEntry());
+ assertEquals("Unexpected last entry", entry1, list.getLeastSignificantOldestEntry());
- SortedQueueEntry entry2 = list.add(generateTestMessage(2, "C"), null);
- assertEquals("Unexpected last entry", entry2, list.getLesserOldestEntry());
+ list.add(generateTestMessage(2, "C"), null);
+ assertEquals("Unexpected last entry", entry1, list.getLeastSignificantOldestEntry());
list.add(generateTestMessage(3, null), null);
- assertEquals("Unexpected last entry", entry2, list.getLesserOldestEntry());
+ assertEquals("Unexpected last entry", entry1, list.getLeastSignificantOldestEntry());
list.add(generateTestMessage(4, "A"), null);
- assertEquals("Unexpected last entry", entry2, list.getLesserOldestEntry());
+ assertEquals("Unexpected last entry", entry1, list.getLeastSignificantOldestEntry());
}
private void validateEntry(final SortedQueueEntry entry, final String expectedSortKey, final long expectedMessageId)
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java Tue Feb 28 11:33:05 2017
@@ -282,13 +282,13 @@ public class StandardQueueEntryListTest
StandardQueueEntryList queueEntryList = new StandardQueueEntryList(_testQueue, _testQueue.getQueueStatistics());
QueueEntry entry1 = queueEntryList.add(createServerMessage(1), null);
- assertEquals("Unexpected last message", entry1, queueEntryList.getLesserOldestEntry());
+ assertEquals("Unexpected last message", entry1, queueEntryList.getLeastSignificantOldestEntry());
queueEntryList.add(createServerMessage(2), null);
- assertEquals("Unexpected last message", entry1, queueEntryList.getLesserOldestEntry());
+ assertEquals("Unexpected last message", entry1, queueEntryList.getLeastSignificantOldestEntry());
queueEntryList.add(createServerMessage(3), null);
- assertEquals("Unexpected last message", entry1, queueEntryList.getLesserOldestEntry());
+ assertEquals("Unexpected last message", entry1, queueEntryList.getLeastSignificantOldestEntry());
}
private ServerMessage createServerMessage(final long id)
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Tue Feb 28 11:33:05 2017
@@ -241,7 +241,7 @@ public class StandardQueueTest extends A
@Override
- public QueueEntry getLesserOldestEntry()
+ public QueueEntry getLeastSignificantOldestEntry()
{
return getOldestEntry();
}
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java Tue Feb 28 11:33:05 2017
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.server.model.OverflowPolicy;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -76,6 +77,10 @@ public class VirtualHostStoreUpgraderAnd
assertEquals("Unexpected queue.queueFlowResumeLimit",
"70.00",
((Map<String, String>) upgradedAttributes.get("context")).get("queue.queueFlowResumeLimit"));
+
+ assertEquals("Unexpected overflowPolicy",
+ OverflowPolicy.PRODUCER_FLOW_CONTROL.name(),
+ String.valueOf(upgradedAttributes.get("overflowPolicy")));
}
private ConfiguredObjectRecord findRecordById(UUID id, List<ConfiguredObjectRecord> records)
Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/common/util.js
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/common/util.js?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/common/util.js (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/common/util.js Tue Feb 28 11:33:05 2017
@@ -603,16 +603,16 @@ define(["dojo/_base/xhr",
util.applyMetadataToWidgets = function (domRoot, category, type, meta)
{
this.applyToWidgets(domRoot, category, type, null, meta);
- }
+ };
- util.applyToWidgets = function (domRoot, category, type, data, meta)
+ util.applyToWidgets = function (domRoot, category, type, data, meta, effectiveData)
{
var widgets = util.findAllWidgets(domRoot);
array.forEach(widgets, function (widget)
{
- widgetconfigurer.config(widget, category, type, data, meta);
+ widgetconfigurer.config(widget, category, type, data, meta, effectiveData);
});
- }
+ };
util.disableWidgetsForImmutableFields = function (domRoot, category, type, meta)
{
@@ -675,6 +675,13 @@ define(["dojo/_base/xhr",
}
else
{
+ if (widget.hasOwnProperty("effectiveDefaultValue") &&
+ value === widget.effectiveDefaultValue && initialData &&
+ (initialData[propName] === null || initialData[propName] === undefined))
+ {
+ // widget value is effective default value, thus, skipping it...
+ continue;
+ }
values[propName] = value ? value : null;
}
}
Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/common/widgetconfigurer.js
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/common/widgetconfigurer.js?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/common/widgetconfigurer.js (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/common/widgetconfigurer.js Tue Feb 28 11:33:05 2017
@@ -98,7 +98,7 @@ define(["dojo/_base/xhr",
}
}
},
- _processWidgetValue: function (widget, category, type, data, meta)
+ _processWidgetValue: function (widget, category, type, data, meta, effectiveData)
{
var widgetName = widget.name;
if (widgetName)
@@ -121,7 +121,22 @@ define(["dojo/_base/xhr",
if (widget instanceof dijit.form.FilteringSelect || widget instanceof dojox.form.CheckedMultiSelect)
{
- var widgetValue = dataValue == null ? defaultValue : dataValue;
+ var widgetValue = dataValue;
+ if ((dataValue === null || dataValue === undefined)
+ && defaultValue !== null && defaultValue !== undefined)
+ {
+ if (new String(defaultValue).indexOf("${") == -1)
+ {
+ widgetValue = defaultValue;
+ }
+ else if (effectiveData && effectiveData.hasOwnProperty(widgetName)
+ && effectiveData[widgetName] !== null && effectiveData[widgetName] !== undefined)
+ {
+ widgetValue = effectiveData[widgetName];
+ widget.effectiveDefaultValue = widgetValue;
+ }
+ }
+
if (widgetValue)
{
widget.set("value", widgetValue);
@@ -141,12 +156,12 @@ define(["dojo/_base/xhr",
}
}
},
- config: function (widget, category, type, data, meta)
+ config: function (widget, category, type, data, meta, effectiveData)
{
this._processWidgetPrompt(widget, category, type, meta);
if (data != null)
{
- this._processWidgetValue(widget, category, type, data, meta);
+ this._processWidgetValue(widget, category, type, data, meta, effectiveData);
}
},
disableIfImmutable: function (widget, category, type, meta)
Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js Tue Feb 28 11:33:05 2017
@@ -458,6 +458,19 @@ define(["dojo/_base/declare",
}
+ function renderMaximumQueueDepth(value)
+ {
+ if (util.isInteger(value))
+ {
+ if (value < 0)
+ {
+ return "<unlimited>";
+ }
+ return new String(value);
+ }
+ return "";
+ }
+
QueueUpdater.prototype.updateHeader = function ()
{
@@ -502,9 +515,8 @@ define(["dojo/_base/declare",
}
this["overflowPolicy"].innerHTML = entities.encode(this.queueData["overflowPolicy"]);
- this["maximumQueueDepthBytes"].innerHTML = this.queueData["maximumQueueDepthBytes"] ? entities.encode(new String(this.queueData["maximumQueueDepthBytes"])) : "";
- this["maximumQueueDepthMessages"].innerHTML = this.queueData["maximumQueueDepthMessages"] ? entities.encode(new String(this.queueData["maximumQueueDepthMessages"])) : "";
-
+ this["maximumQueueDepthBytes"].innerHTML = renderMaximumQueueDepth(this.queueData.maximumQueueDepthBytes);
+ this["maximumQueueDepthMessages"].innerHTML = renderMaximumQueueDepth(this.queueData.maximumQueueDepthMessages);
if (this.queueData["messageGroupKey"])
{
this.messageGroupKey.innerHTML = entities.encode(String(this.queueData["messageGroupKey"]));
Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/editQueue.js
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/editQueue.js?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/editQueue.js (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/editQueue.js Tue Feb 28 11:33:05 2017
@@ -152,7 +152,9 @@ define(["dojox/html/entities",
"Queue",
data.actual.type,
data.actual,
- this.management.metadata);
+ this.management.metadata,
+ data.effective
+ );
this.context.setData(data.actual.context, data.effective.context, data.inheritedActual.context);
Modified: qpid/java/trunk/doc/java-broker/src/docbkx/concepts/Java-Broker-Concepts-Queues.xml
URL: http://svn.apache.org/viewvc/qpid/java/trunk/doc/java-broker/src/docbkx/concepts/Java-Broker-Concepts-Queues.xml?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/doc/java-broker/src/docbkx/concepts/Java-Broker-Concepts-Queues.xml (original)
+++ qpid/java/trunk/doc/java-broker/src/docbkx/concepts/Java-Broker-Concepts-Queues.xml Tue Feb 28 11:33:05 2017
@@ -400,9 +400,7 @@ amqp://guest:guest@client1/development?m
<emphasis>Ring</emphasis>
- The oldest messages are removed. For a
<link linkend="Java-Broker-Concepts-Queues-Types-Priority">Priority Queue</link>
- oldest messages with lowest priorities are removed. For a
- <link linkend="Java-Broker-Concepts-Queues-Types-Sorted">Sorted Queue</link>
- oldest messages with lowest keys are removed.
+ oldest messages with lowest priorities are removed.
</para>
</listitem>
<listitem>
Modified: qpid/java/trunk/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverydelay/DeliveryDelayTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverydelay/DeliveryDelayTest.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverydelay/DeliveryDelayTest.java (original)
+++ qpid/java/trunk/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverydelay/DeliveryDelayTest.java Tue Feb 28 11:33:05 2017
@@ -33,6 +33,7 @@ import javax.jms.InvalidDestinationRunti
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.JMSProducer;
+import javax.jms.JMSRuntimeException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
@@ -102,9 +103,10 @@ public class DeliveryDelayTest extends Q
producer.send(queue, "message");
fail("Exception not thrown");
}
- catch (InvalidDestinationRuntimeException e)
+ catch (JMSRuntimeException e)
{
- // PASS
+ assertTrue("Unexpected exception message: " + e.getMessage(),
+ e.getMessage().contains("amqp:precondition-failed"));
}
}
}
@@ -142,9 +144,10 @@ public class DeliveryDelayTest extends Q
producer.send(publishDest, "message with delivery delay");
fail("Exception not thrown");
}
- catch (InvalidDestinationRuntimeException e)
+ catch (JMSRuntimeException e)
{
- // PASS
+ assertTrue("Unexpected exception message: " + e.getMessage(),
+ e.getMessage().contains("amqp:precondition-failed"));
}
}
}
Modified: qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java (original)
+++ qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java Tue Feb 28 11:33:05 2017
@@ -22,6 +22,7 @@ package org.apache.qpid.test.utils;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -170,6 +171,71 @@ public class AmqpManagementFacade
}
}
+ public Map<String, Object> readEntityUsingAmqpManagement(final Session session,
+ final String type,
+ final String name,
+ final boolean actuals) throws JMSException
+ {
+ MessageProducer producer = session.createProducer(session.createQueue(_qpidBrokerTestCase.isBroker10()
+ ? "$management"
+ : "ADDR:$management"));
+
+ final TemporaryQueue responseQueue = session.createTemporaryQueue();
+ MessageConsumer consumer = session.createConsumer(responseQueue);
+
+ MapMessage request = session.createMapMessage();
+ request.setStringProperty("type", type);
+ request.setStringProperty("operation", "READ");
+ request.setString("name", name);
+ request.setString("object-path", name);
+ request.setStringProperty("index", "object-path");
+ request.setStringProperty("key", name);
+ request.setBooleanProperty("actuals", actuals);
+ request.setJMSReplyTo(responseQueue);
+
+ producer.send(request);
+ if (session.getTransacted())
+ {
+ session.commit();
+ }
+
+ Message response = consumer.receive(5000);
+ if (session.getTransacted())
+ {
+ session.commit();
+ }
+ try
+ {
+ if (response instanceof MapMessage)
+ {
+ MapMessage bodyMap = (MapMessage) response;
+ Map<String, Object> data = new HashMap<>();
+ Enumeration<String> keys = bodyMap.getMapNames();
+ while (keys.hasMoreElements())
+ {
+ String key = keys.nextElement();
+ data.put(key, bodyMap.getObject(key));
+ }
+ return data;
+ }
+ else if (response instanceof ObjectMessage)
+ {
+ Object body = ((ObjectMessage) response).getObject();
+ if (body instanceof Map)
+ {
+ Map<String, ?> bodyMap = (Map<String, ?>) body;
+ return new HashMap<>(bodyMap);
+ }
+ }
+ throw new IllegalArgumentException("Cannot parse the results from a management read");
+ }
+ finally
+ {
+ consumer.close();
+ responseQueue.delete();
+ }
+ }
+
private List<Map<String, Object>> getResultsAsMap(final List<String> attributeNames, final List<List<Object>> attributeValues)
{
List<Map<String, Object>> results = new ArrayList<>();
Modified: qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java (original)
+++ qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java Tue Feb 28 11:33:05 2017
@@ -373,6 +373,11 @@ public class QpidBrokerTestCase extends
return _managementFacade.managementQueryObjects(session, type);
}
+ protected Map<String, Object> managementReadObject(Session session, String type, String name, boolean actuals) throws JMSException
+ {
+ return _managementFacade.readEntityUsingAmqpManagement(session, type, name, actuals);
+ }
+
public long getQueueDepth(final Connection con, final Queue destination) throws Exception
{
return _jmsProvider.getQueueDepth(con, destination);
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java?rev=1784725&r1=1784724&r2=1784725&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java Tue Feb 28 11:33:05 2017
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
@@ -64,6 +65,8 @@ public class ProducerFlowControlTest ext
private RestTestHelper _restTestHelper;
private final AtomicInteger _sentMessages = new AtomicInteger(0);
+ private int _messageSizeIncludingHeader;
+ private Session _utilitySession;
public void setUp() throws Exception
{
@@ -73,6 +76,11 @@ public class ProducerFlowControlTest ext
_restTestHelper = new RestTestHelper(getDefaultBroker().getHttpPort());
_monitor.markDiscardPoint();
+ if (!isBroker10())
+ {
+ setSystemProperty("sync_publish", "all");
+ }
+
_producerConnection = getConnection();
_producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -81,6 +89,16 @@ public class ProducerFlowControlTest ext
_consumerConnection = getConnection();
_consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Connection utilityConnection = getConnection();
+ utilityConnection.start();
+ _utilitySession = utilityConnection.createSession(true, Session.SESSION_TRANSACTED);
+ String tmpQueueName = getTestQueueName() + "_Tmp";
+ Queue tmpQueue = createTestQueue(_utilitySession, tmpQueueName);
+ MessageProducer tmpQueueProducer= _utilitySession.createProducer(tmpQueue);
+ tmpQueueProducer.send(nextMessage(0, _utilitySession));
+ _utilitySession.commit();
+
+ _messageSizeIncludingHeader = getQueueDepthBytes(tmpQueueName);
}
public void tearDown() throws Exception
@@ -107,45 +125,46 @@ public class ProducerFlowControlTest ext
{
String queueName = getTestQueueName();
- createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, 1000, 800);
+ int capacity = _messageSizeIncludingHeader * 3 + _messageSizeIncludingHeader / 2;
+ int resumeCapacity = _messageSizeIncludingHeader * 2;
+ createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, capacity, resumeCapacity);
_producer = _producerSession.createProducer(_queue);
// try to send 5 messages (should block after 4)
- sendMessagesAsync(_producer, _producerSession, 5, 50L);
-
- Thread.sleep(5000);
+ CountDownLatch sendLatch = sendMessagesAsync(_producer, _producerSession, 5, 5L).getSendLatch();
+ assertTrue("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", true, 5000));
assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get());
_consumer = _consumerSession.createConsumer(_queue);
_consumerConnection.start();
+ Message message = _consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message is not received", message);
- _consumer.receive();
-
- Thread.sleep(1000);
+ assertFalse("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", false, 1000));
assertEquals("Message incorrectly sent after one message received", 4, _sentMessages.get());
-
- _consumer.receive();
-
- Thread.sleep(1000);
-
+ Message message2 = _consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message is not received", message2);
+ assertTrue("Message sending is not finished", sendLatch.await(1000, TimeUnit.MILLISECONDS));
assertEquals("Message not sent after two messages received", 5, _sentMessages.get());
-
}
public void testBrokerLogMessages() throws Exception
{
String queueName = getTestQueueName();
-
- createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, 1000, 800);
+
+ int capacity = _messageSizeIncludingHeader * 3 + _messageSizeIncludingHeader / 2;
+ int resumeCapacity = _messageSizeIncludingHeader * 2;
+
+ createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, capacity, resumeCapacity);
_producer = _producerSession.createProducer(_queue);
// try to send 5 messages (should block after 4)
- sendMessagesAsync(_producer, _producerSession, 5, 50L);
+ sendMessagesAsync(_producer, _producerSession, 5, 5L);
List<String> results = waitAndFindMatches("QUE-1003", 7000);
@@ -165,24 +184,28 @@ public class ProducerFlowControlTest ext
public void testFlowControlOnCapacityResumeEqual() throws Exception
{
String queueName = getTestQueueName();
-
- createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, 1000, 1000);
+
+ int capacity = _messageSizeIncludingHeader * 3 + _messageSizeIncludingHeader / 2;
+ createAndBindQueueWithFlowControlEnabled(_producerSession, queueName,
+ capacity,
+ capacity);
_producer = _producerSession.createProducer(_queue);
// try to send 5 messages (should block after 4)
- sendMessagesAsync(_producer, _producerSession, 5, 50L);
+ CountDownLatch sendLatch = sendMessagesAsync(_producer, _producerSession, 5, 5L).getSendLatch();
- Thread.sleep(5000);
+ assertTrue("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", true,5000));
assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get());
_consumer = _consumerSession.createConsumer(_queue);
_consumerConnection.start();
- _consumer.receive();
+ Message message = _consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message is not received", message);
- Thread.sleep(1000);
+ assertTrue("Message sending is not finished", sendLatch.await(1000, TimeUnit.MILLISECONDS));
assertEquals("Message incorrectly sent after one message received", 5, _sentMessages.get());
@@ -198,7 +221,9 @@ public class ProducerFlowControlTest ext
final int numProducers = 10;
final int numMessages = 100;
- createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, 6000, 3000);
+ final int capacity = _messageSizeIncludingHeader * 20;
+
+ createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, capacity, capacity/2);
_consumerConnection.start();
@@ -211,7 +236,7 @@ public class ProducerFlowControlTest ext
Session session = producers[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer myproducer = session.createProducer(_queue);
- MessageSender sender = sendMessagesAsync(myproducer, session, numMessages, 50L);
+ MessageSender sender = sendMessagesAsync(myproducer, session, numMessages, 5L);
}
_consumer = _consumerSession.createConsumer(_queue);
@@ -221,7 +246,6 @@ public class ProducerFlowControlTest ext
{
Message msg = _consumer.receive(5000);
- Thread.sleep(50L);
assertNotNull("Message not received("+j+"), sent: "+_sentMessages.get(), msg);
}
@@ -301,9 +325,9 @@ public class ProducerFlowControlTest ext
private int getQueueDepthBytes(final String queueName) throws IOException
{
// On AMQP 1.0 the size of the message on the broker is not necessarily the size of the message we sent. Therefore, get the actual size from the broker
- final String requestUrl = String.format("queue/%1$s/%1$s/%2$s/getStatistics?statistics=[\"queueDepthBytes\"]", TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, queueName);
+ final String requestUrl = String.format("queue/%1$s/%1$s/%2$s/getStatistics?statistics=[\"queueDepthBytesIncludingHeader\"]", TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, queueName);
final Map<String, Object> queueAttributes = _restTestHelper.getJsonAsMap(requestUrl);
- return ((Number) queueAttributes.get("queueDepthBytes")).intValue();
+ return ((Number) queueAttributes.get("queueDepthBytesIncludingHeader")).intValue();
}
private void waitForFlowControlAndMessageCount(final String queueUrl, final int messageCount, final int timeout) throws InterruptedException, IOException
@@ -341,29 +365,28 @@ public class ProducerFlowControlTest ext
public void testQueueDeleteWithBlockedFlow() throws Exception
{
String queueName = getTestQueueName();
- createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, 1000, 800, true, false);
+ int capacity = _messageSizeIncludingHeader * 3 + _messageSizeIncludingHeader / 2;
+ int resumeCapacity = _messageSizeIncludingHeader * 2;
+ createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, capacity, resumeCapacity, true, false);
_producer = _producerSession.createProducer(_queue);
// try to send 5 messages (should block after 4)
- sendMessagesAsync(_producer, _producerSession, 5, 50L);
+ sendMessagesAsync(_producer, _producerSession, 5, 5L);
- Thread.sleep(5000);
+ assertTrue("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", true,5000));
assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get());
- // close blocked producer session and connection
- _producerConnection.close();
-
if(!isBroker10())
{
// delete queue with a consumer session
- ((AMQSession<?, ?>) _consumerSession).sendQueueDelete(queueName);
+ ((AMQSession<?, ?>) _utilitySession).sendQueueDelete(queueName);
}
else
{
- deleteEntityUsingAmqpManagement(getTestQueueName(), _consumerSession, "org.apache.qpid.Queue");
- createTestQueue(_consumerSession);
+ deleteEntityUsingAmqpManagement(getTestQueueName(), _utilitySession, "org.apache.qpid.Queue");
+ createTestQueue(_utilitySession);
}
_consumer = _consumerSession.createConsumer(_queue);
_consumerConnection.start();
@@ -432,7 +455,7 @@ public class ProducerFlowControlTest ext
private final Session _senderSession;
private final int _numMessages;
private volatile JMSException _exception;
- private CountDownLatch _exceptionThrownLatch = new CountDownLatch(1);
+ private CountDownLatch _sendLatch = new CountDownLatch(1);
private long _sleepPeriod;
public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
@@ -452,10 +475,18 @@ public class ProducerFlowControlTest ext
catch (JMSException e)
{
_exception = e;
- _exceptionThrownLatch.countDown();
+ }
+ finally
+ {
+ _sendLatch.countDown();
}
}
+ public CountDownLatch getSendLatch()
+ {
+ return _sendLatch;
+ }
+
private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
throws JMSException
{
@@ -500,15 +531,50 @@ public class ProducerFlowControlTest ext
}
}
- private final byte[] BYTE_300 = new byte[300];
+ }
+
+ private final byte[] BYTE_300 = new byte[300];
- private Message nextMessage(int msg, Session producerSession) throws JMSException
+ private Message nextMessage(int msg, Session producerSession) throws JMSException
+ {
+ BytesMessage send = producerSession.createBytesMessage();
+ send.writeBytes(BYTE_300);
+ send.setIntProperty("msg", msg);
+ return send;
+ }
+
+ private boolean awaitAttributeValue(String queueName, String attributeName, Object expectedValue, long timeout)
+ throws JMSException, InterruptedException
+ {
+ long startTime = System.currentTimeMillis();
+ long endTime = startTime + timeout;
+ boolean found = false;
+ do
{
- BytesMessage send = producerSession.createBytesMessage();
- send.writeBytes(BYTE_300);
- send.setIntProperty("msg", msg);
+ Map<String, Object> attributes =
+ managementReadObject(_utilitySession, "org.apache.qpid.SortedQueue", queueName, false);
+ Object actualValue = attributes.get(attributeName);
+ if (expectedValue == null)
+ {
+ found = actualValue == null;
+ }
+ else if (actualValue != null)
+ {
+ if (actualValue.getClass() == expectedValue.getClass())
+ {
+ found = expectedValue.equals(actualValue);
+ }
+ else
+ {
+ found = String.valueOf(expectedValue).equals(String.valueOf(actualValue));
+ }
+ }
- return send;
- }
+ if (!found)
+ {
+ Thread.sleep(50);
+ }
+ } while (!found && System.currentTimeMillis() <= endTime);
+ return found;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org