You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/02/23 17:11:05 UTC
svn commit: r1784178 [2/3] - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/logging/messages/
broker-core/src/main/java/org/apache/qpid/server/message/
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/...
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java Thu Feb 23 17:11:04 2017
@@ -38,7 +38,6 @@ import org.apache.qpid.server.model.Brok
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -208,6 +207,23 @@ public class LastValueQueueListTest exte
assertEquals(0, _list.getLatestValuesMap().size());
}
+ public void testGetLesserOldestEntry()
+ {
+ LastValueQueueList queueEntryList = new LastValueQueueList(_queue, _queue.getQueueStatistics());
+
+ QueueEntry entry1 = queueEntryList.add(createTestServerMessage(TEST_KEY_VALUE1), null);
+ assertEquals("Unexpected last message", entry1, queueEntryList.getLesserOldestEntry());
+
+ QueueEntry entry2 = queueEntryList.add(createTestServerMessage(TEST_KEY_VALUE2), null);
+ assertEquals("Unexpected last message", entry1, queueEntryList.getLesserOldestEntry());
+
+ QueueEntry entry3 = queueEntryList.add(createTestServerMessage(TEST_KEY_VALUE1), null);
+ assertEquals("Unexpected last message", entry2, queueEntryList.getLesserOldestEntry());
+
+ queueEntryList.add(createTestServerMessage(TEST_KEY_VALUE2), null);
+ assertEquals("Unexpected last message", entry3, queueEntryList.getLesserOldestEntry());
+ }
+
private int countEntries(LastValueQueueList list)
{
QueueEntryIterator iterator =
@@ -237,12 +253,4 @@ public class LastValueQueueListTest exte
return mockMessage;
}
- private Queue<?> createTestQueue()
- {
- Queue<?> queue = mock(Queue.class);
- VirtualHost virtualHost = mock(VirtualHost.class);
- when(queue.getVirtualHost()).thenReturn(virtualHost);
-
- return queue;
- }
}
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java Thu Feb 23 17:11:04 2017
@@ -162,4 +162,24 @@ public class PriorityQueueListTest exten
assertEquals("second message with priority 4 should be 'later' than second message of priority 5",
1, _priority4message2.compareTo(_priority5message2));
}
+
+ public void testGetLesserOldestEntry()
+ {
+ assertEquals("Unexpected last entry", _priority4message1, _list.getLesserOldestEntry());
+
+ ServerMessage<?> message = mock(ServerMessage.class);
+ AMQMessageHeader header = mock(AMQMessageHeader.class);
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ MessageReference<ServerMessage> ref = mock(MessageReference.class);
+
+ when(message.getMessageHeader()).thenReturn(header);
+ when(message.newReference()).thenReturn(ref);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
+ when(ref.getMessage()).thenReturn(message);
+ when(header.getPriority()).thenReturn((byte)3);
+
+ QueueEntry newEntry = _list.add(message, null);
+
+ assertEquals("Unexpected last entry", newEntry, _list.getLesserOldestEntry());
+ }
}
Added: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java?rev=1784178&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java (added)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java Thu Feb 23 17:11:04 2017
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.queue;
+
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.security.PrivilegedAction;
+import java.util.Collections;
+
+import javax.security.auth.Subject;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+
+import org.apache.qpid.server.connection.SessionPrincipal;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.messages.QueueMessages;
+import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.session.AMQPSession;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class ProducerFlowControlOverflowPolicyHandlerTest extends QpidTestCase
+{
+ private ProducerFlowControlOverflowPolicyHandler _producerFlowControlOverflowPolicyHandler;
+ private Queue<?> _queue;
+ private EventLogger _eventLogger;
+ private LogSubject _subject;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _eventLogger = mock(EventLogger.class);
+ _subject = mock(LogSubject.class);
+
+ _queue = mock(AbstractQueue.class);
+ when(_queue.getMaximumQueueDepthBytes()).thenReturn(-1L);
+ when(_queue.getMaximumQueueDepthMessages()).thenReturn(-1L);
+ when(_queue.getOverflowPolicy()).thenReturn(OverflowPolicy.PRODUCER_FLOW_CONTROL);
+ when(_queue.getContextValue(Double.class, Queue.QUEUE_FLOW_RESUME_LIMIT)).thenReturn(80.0);
+ when(_queue.getQueueDepthBytes()).thenReturn(0L);
+ when(_queue.getQueueDepthMessages()).thenReturn(0);
+ when(_queue.getLogSubject()).thenReturn(_subject);
+
+ _producerFlowControlOverflowPolicyHandler = new ProducerFlowControlOverflowPolicyHandler(_queue, _eventLogger);
+ }
+
+ public void testHandleOverflowBlocksOverfullBytes() throws Exception
+ {
+ AMQPSession<?, ?> session = mock(AMQPSession.class);
+ when(_queue.getQueueDepthBytes()).thenReturn(11L);
+ when(_queue.getMaximumQueueDepthBytes()).thenReturn(10L);
+
+ checkOverflow(session);
+
+ verify(session, times(1)).block(_queue);
+ LogMessage logMessage = QueueMessages.OVERFULL(11, 10, 0, -1);
+ verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(logMessage)));
+ }
+
+ public void testHandleOverflowBlocksOverfullMessages() throws Exception
+ {
+ AMQPSession<?, ?> session = mock(AMQPSession.class);
+
+ when(_queue.getMaximumQueueDepthMessages()).thenReturn(10L);
+ when(_queue.getQueueDepthMessages()).thenReturn(11);
+ checkOverflow(session);
+ verify(session, times(1)).block(_queue);
+
+ LogMessage logMessage = QueueMessages.OVERFULL(0, -1, 11, 10);
+ verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(logMessage)));
+ }
+
+ public void testIsQueueFlowStopped() throws Exception
+ {
+ assertFalse("Flow should not be stopped", _producerFlowControlOverflowPolicyHandler.isQueueFlowStopped());
+
+ when(_queue.getQueueDepthBytes()).thenReturn(11L);
+ when(_queue.getMaximumQueueDepthBytes()).thenReturn(10L);
+
+ checkOverflow(mock(AMQPSession.class));
+
+ assertTrue("Flow should be stopped", _producerFlowControlOverflowPolicyHandler.isQueueFlowStopped());
+ }
+
+ public void testCheckCapacityResumesFlowForBytes() throws Exception
+ {
+ AMQPSession<?, ?> session = mock(AMQPSession.class);
+ when(_queue.getQueueDepthBytes()).thenReturn(11L);
+ when(_queue.getMaximumQueueDepthBytes()).thenReturn(10L);
+
+ checkOverflow(session);
+
+ assertTrue("Flow should be stopped", _producerFlowControlOverflowPolicyHandler.isQueueFlowStopped());
+ when(_queue.getQueueDepthBytes()).thenReturn(8L);
+
+ _producerFlowControlOverflowPolicyHandler.checkOverflow();
+
+ verify(session, times(1)).unblock(_queue);
+ assertFalse("Flow should not be stopped", _producerFlowControlOverflowPolicyHandler.isQueueFlowStopped());
+ LogMessage logMessage = QueueMessages.UNDERFULL(8, 8, 0, -1);
+ verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(logMessage)));
+ }
+
+ public void testCheckCapacityResumesFlowForMessages() throws Exception
+ {
+ AMQPSession<?, ?> session = mock(AMQPSession.class);
+ when(_queue.getQueueDepthMessages()).thenReturn(11);
+ when(_queue.getMaximumQueueDepthMessages()).thenReturn(10L);
+
+ checkOverflow(session);
+
+ assertTrue("Flow should be stopped", _producerFlowControlOverflowPolicyHandler.isQueueFlowStopped());
+ when(_queue.getQueueDepthMessages()).thenReturn(8);
+
+ _producerFlowControlOverflowPolicyHandler.checkOverflow();
+
+ verify(session, times(1)).unblock(_queue);
+ assertFalse("Flow should not be stopped", _producerFlowControlOverflowPolicyHandler.isQueueFlowStopped());
+ LogMessage logMessage = QueueMessages.UNDERFULL(0, -1, 8, 8);
+ verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(logMessage)));
+ }
+
+ private void checkOverflow(AMQPSession<?, ?> session)
+ {
+ Subject subject = createSubject(session);
+ Subject.doAs(subject, new PrivilegedAction<Void>()
+ {
+ @Override
+ public Void run()
+ {
+ _producerFlowControlOverflowPolicyHandler.checkOverflow();
+ return null;
+ }
+ });
+ }
+
+ private Subject createSubject(final AMQPSession<?, ?> session)
+ {
+ SessionPrincipal sessionPrincipal = new SessionPrincipal(session);
+ return new Subject(true,
+ Collections.singleton(sessionPrincipal),
+ Collections.EMPTY_SET,
+ Collections.EMPTY_SET);
+ }
+
+ public static class LogMessageMatcher extends BaseMatcher<LogMessage>
+ {
+ private final LogMessage _expected;
+ private Object _actual;
+
+ LogMessageMatcher(final LogMessage expected)
+ {
+ this._expected = expected;
+ }
+
+ @Override
+ public void describeTo(final Description description)
+ {
+ description.appendText("Expected '");
+ description.appendText(_expected.toString());
+ description.appendText("' but got '");
+ description.appendText(String.valueOf(_actual));
+ description.appendText("'");
+ }
+
+ @Override
+ public boolean matches(final Object argument)
+ {
+ _actual = argument;
+ return argument instanceof LogMessage && _expected.toString().equals((argument).toString());
+ }
+ }
+}
Added: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java?rev=1784178&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java (added)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java Thu Feb 23 17:11:04 2017
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.queue;
+
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.messages.QueueMessages;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.queue.ProducerFlowControlOverflowPolicyHandlerTest.LogMessageMatcher;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class RingOverflowPolicyHandlerTest extends QpidTestCase
+{
+ private RingOverflowPolicyHandler _ringOverflowPolicyHandler;
+ private Queue<?> _queue;
+ private EventLogger _eventLogger;
+ private LogSubject _subject;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _eventLogger = mock(EventLogger.class);
+ _subject = mock(LogSubject.class);
+
+ _queue = mock(AbstractQueue.class);
+ when(_queue.getMaximumQueueDepthBytes()).thenReturn(-1L);
+ when(_queue.getMaximumQueueDepthMessages()).thenReturn(-1L);
+ when(_queue.getOverflowPolicy()).thenReturn(OverflowPolicy.RING);
+ when(_queue.getQueueDepthBytes()).thenReturn(0L);
+ when(_queue.getQueueDepthMessages()).thenReturn(0);
+ when(_queue.getLogSubject()).thenReturn(_subject);
+
+ _ringOverflowPolicyHandler = new RingOverflowPolicyHandler(_queue, _eventLogger);
+ }
+
+ public void testHandleOverflowBytes() throws Exception
+ {
+ QueueEntry lastEntry = createLastEntry();
+ when(_queue.getLesserOldestEntry()).thenReturn(lastEntry, (QueueEntry) null);
+ when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(10L, 4L);
+ when(_queue.getMaximumQueueDepthBytes()).thenReturn(5L);
+ when(_queue.getQueueDepthMessages()).thenReturn(3, 1);
+
+ _ringOverflowPolicyHandler.checkOverflow();
+
+ verify(_queue).deleteEntry(lastEntry);
+
+ LogMessage dropped = QueueMessages.DROPPED(1L, 4, 1, 5,-1);
+ verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(dropped)));
+ }
+
+ public void testHandleOverflowMessages() throws Exception
+ {
+ QueueEntry lastEntry = createLastEntry();
+ when(_queue.getLesserOldestEntry()).thenReturn(lastEntry, (QueueEntry) null);
+ when(_queue.getQueueDepthMessages()).thenReturn(10, 5);
+ when(_queue.getMaximumQueueDepthMessages()).thenReturn(5L);
+ when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(10L, 4L);
+
+ _ringOverflowPolicyHandler.checkOverflow();
+
+ verify((AbstractQueue<?>) _queue).deleteEntry(lastEntry);
+
+ LogMessage dropped = QueueMessages.DROPPED(1, 4, 5, -1,5);
+ verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(dropped)));
+ }
+
+ private QueueEntry createLastEntry()
+ {
+ AMQMessageHeader oldestMessageHeader = mock(AMQMessageHeader.class);
+ ServerMessage oldestMessage = mock(ServerMessage.class);
+ when(oldestMessage.getMessageHeader()).thenReturn(oldestMessageHeader);
+ QueueEntry oldestEntry = mock(QueueEntry.class);
+ when(oldestEntry.getMessage()).thenReturn(oldestMessage);
+ return oldestEntry;
+ }
+}
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java Thu Feb 23 17:11:04 2017
@@ -421,6 +421,23 @@ public class SortedQueueEntryListTest ex
validateEntry(entry, "D", 2);
}
+ public void testGetLesserOldestEntry()
+ {
+ SortedQueueEntryList list = new SortedQueueEntryList(_testQueue, _testQueue.getQueueStatistics());
+
+ SortedQueueEntry entry1 = list.add(generateTestMessage(1, "B"), null);
+ assertEquals("Unexpected last entry", entry1, list.getLesserOldestEntry());
+
+ SortedQueueEntry entry2 = list.add(generateTestMessage(2, "C"), null);
+ assertEquals("Unexpected last entry", entry2, list.getLesserOldestEntry());
+
+ list.add(generateTestMessage(3, null), null);
+ assertEquals("Unexpected last entry", entry2, list.getLesserOldestEntry());
+
+ list.add(generateTestMessage(4, "A"), null);
+ assertEquals("Unexpected last entry", entry2, list.getLesserOldestEntry());
+ }
+
private void validateEntry(final SortedQueueEntry entry, final String expectedSortKey, final long expectedMessageId)
{
assertEquals("Sorted queue entry value is not as expected",
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java Thu Feb 23 17:11:04 2017
@@ -72,14 +72,8 @@ public class StandardQueueEntryListTest
_sqel = _testQueue.getEntries();
for(int i = 1; i <= 100; i++)
{
- final ServerMessage message = mock(ServerMessage.class);
- when(message.getMessageNumber()).thenReturn((long) i);
- MessageReference ref = mock(MessageReference.class);
- when(ref.getMessage()).thenReturn(message);
- when(message.newReference()).thenReturn(ref);
- when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
- final QueueEntry bleh = _sqel.add(message, null);
+ final QueueEntry bleh = _sqel.add(createServerMessage(i), null);
assertNotNull("QE should not have been null", bleh);
}
}
@@ -132,14 +126,7 @@ public class StandardQueueEntryListTest
@Override
public ServerMessage getTestMessageToAdd()
{
- ServerMessage msg = mock(ServerMessage.class);
- MessageReference ref = mock(MessageReference.class);
- when(ref.getMessage()).thenReturn(msg);
- when(msg.getMessageNumber()).thenReturn(1l);
- when(msg.newReference()).thenReturn(ref);
- when(msg.newReference(any(TransactionLogResource.class))).thenReturn(ref);
-
- return msg;
+ return createServerMessage(1);
}
@Override
@@ -159,13 +146,7 @@ public class StandardQueueEntryListTest
//Add messages to generate QueueEntry's
for(int i = 1; i <= 100 ; i++)
{
- ServerMessage message = mock(ServerMessage.class);
- when(message.getMessageNumber()).thenReturn((long) i);
- MessageReference ref = mock(MessageReference.class);
- when(ref.getMessage()).thenReturn(message);
- when(message.newReference()).thenReturn(ref);
- when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
- QueueEntry bleh = sqel.add(message, null);
+ QueueEntry bleh = sqel.add(createServerMessage(i), null);
assertNotNull("QE should not have been null", bleh);
entriesMap.put(i,bleh);
}
@@ -262,13 +243,7 @@ public class StandardQueueEntryListTest
// create test entries
for(int i = 0; i < numberOfEntries; i++)
{
- ServerMessage message = mock(ServerMessage.class);
- when(message.getMessageNumber()).thenReturn((long)i);
- final MessageReference reference = mock(MessageReference.class);
- when(reference.getMessage()).thenReturn(message);
- when(message.newReference()).thenReturn(reference);
- when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
- entries[i] = (OrderedQueueEntry) queueEntryList.add(message, null);
+ entries[i] = (OrderedQueueEntry) queueEntryList.add(createServerMessage(i), null);
}
// test getNext for not acquired entries
@@ -301,4 +276,29 @@ public class StandardQueueEntryListTest
next = next.getNextValidEntry();
assertNull("The next entry after the last should be null", next);
}
+
+ public void testGetLesserOldestEntry()
+ {
+ StandardQueueEntryList queueEntryList = new StandardQueueEntryList(_testQueue, _testQueue.getQueueStatistics());
+
+ QueueEntry entry1 = queueEntryList.add(createServerMessage(1), null);
+ assertEquals("Unexpected last message", entry1, queueEntryList.getLesserOldestEntry());
+
+ queueEntryList.add(createServerMessage(2), null);
+ assertEquals("Unexpected last message", entry1, queueEntryList.getLesserOldestEntry());
+
+ queueEntryList.add(createServerMessage(3), null);
+ assertEquals("Unexpected last message", entry1, queueEntryList.getLesserOldestEntry());
+ }
+
+ private ServerMessage createServerMessage(final long id)
+ {
+ ServerMessage message = mock(ServerMessage.class);
+ when(message.getMessageNumber()).thenReturn(id);
+ final MessageReference reference = mock(MessageReference.class);
+ when(reference.getMessage()).thenReturn(message);
+ when(message.newReference()).thenReturn(reference);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
+ return message;
+ }
}
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Thu Feb 23 17:11:04 2017
@@ -240,6 +240,11 @@ public class StandardQueueTest extends A
}
+ @Override
+ public QueueEntry getLesserOldestEntry()
+ {
+ return getOldestEntry();
+ }
}
private static class DequeuedQueueEntry extends OrderedQueueEntry
Added: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java?rev=1784178&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java (added)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java Thu Feb 23 17:11:04 2017
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.store;
+
+
+import static org.mockito.Mockito.mock;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class VirtualHostStoreUpgraderAndRecovererTest extends QpidTestCase
+{
+ private VirtualHostNode<?> _virtualHostNode;
+ private VirtualHostStoreUpgraderAndRecoverer _upgraderAndRecoverer;
+ private DurableConfigurationStore _store;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _virtualHostNode = mock(VirtualHostNode.class);
+ _store = mock(DurableConfigurationStore.class);
+ _upgraderAndRecoverer = new VirtualHostStoreUpgraderAndRecoverer(_virtualHostNode);
+ }
+
+ public void testUpgradeForFlowControlFrom_6_1() throws Exception
+ {
+ Map<String, Object> rootAttributes = new HashMap<>();
+ rootAttributes.put("modelVersion", "6.1");
+ rootAttributes.put("name", "root");
+ ConfiguredObjectRecord rootRecord =
+ new ConfiguredObjectRecordImpl(UUID.randomUUID(), "VirtualHost", rootAttributes);
+ Map<String, Object> queueAttributes = new HashMap<>();
+ queueAttributes.put("name", "queue");
+ queueAttributes.put("queueFlowControlSizeBytes", 1000);
+ queueAttributes.put("queueFlowResumeSizeBytes", 700);
+ ConfiguredObjectRecord queueRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), "Queue", queueAttributes,
+ Collections.singletonMap(rootRecord.getType(),
+ rootRecord.getId()));
+ List<ConfiguredObjectRecord> records = Arrays.asList(rootRecord, queueRecord);
+ List<ConfiguredObjectRecord> upgradedRecords =
+ _upgraderAndRecoverer.upgrade(_store, records, "VirtualHost", "modelVersion");
+
+ ConfiguredObjectRecord upgradedQueueRecord = findRecordById(queueRecord.getId(), upgradedRecords);
+ assertNotNull("Upgraded queue record not found ", upgradedQueueRecord);
+
+ Map<String, Object> upgradedAttributes = upgradedQueueRecord.getAttributes();
+ assertNotNull("Upgraded attributes not found", upgradedAttributes);
+
+ assertEquals("Unexpected maximumQueueDepthBytes", 1000, upgradedAttributes.get("maximumQueueDepthBytes"));
+ assertEquals("Unexpected queue.queueFlowResumeLimit",
+ "70.00",
+ ((Map<String, String>) upgradedAttributes.get("context")).get("queue.queueFlowResumeLimit"));
+ }
+
+ private ConfiguredObjectRecord findRecordById(UUID id, List<ConfiguredObjectRecord> records)
+ {
+ for (ConfiguredObjectRecord record : records)
+ {
+ if (id.equals(record.getId()))
+ {
+ return record;
+ }
+ }
+ return null;
+ }
+}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Thu Feb 23 17:11:04 2017
@@ -82,11 +82,9 @@ import org.apache.qpid.server.model.Brok
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.network.Frame;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
@@ -120,7 +118,6 @@ public class ServerSession extends Sessi
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
- private final CheckCapacityAction _checkCapacityAction = new CheckCapacityAction();
private final long timeout = 60000; // TODO server side close does not require this
// completed incoming commands
private final Object processedLock = new Object();
@@ -1007,24 +1004,11 @@ public class ServerSession extends Sessi
}
final RoutingResult<MessageTransferMessage> result =
exchange.route(message, message.getInitialRoutingAddress(), instanceProperties);
- if (result.isRoutingFailure())
- {
- org.apache.qpid.server.transport.ExecutionException ex = new org.apache.qpid.server.transport.ExecutionException();
- ex.setErrorCode(ExecutionErrorCode.get(result.getErrorCodeAmqp_0_10()));
- ex.setCommandId((int) message.getMessageNumber());
- ex.setDescription(result.getErrorMessage());
- invoke(ex);
- close(ExecutionErrorCode.get(result.getErrorCodeAmqp_0_10()).getValue(), result.getErrorMessage());
- return 0;
- }
- else
- {
- int enqueues = result.send(_transaction, _checkCapacityAction);
- getAMQPConnection().registerMessageReceived(message.getSize(), message.getArrivalTime());
- incrementOutstandingTxnsIfNecessary();
- incrementUncommittedMessageSize(message.getStoredMessage());
- return enqueues;
- }
+ int enqueues = result.send(_transaction, null);
+ getAMQPConnection().registerMessageReceived(message.getSize(), message.getArrivalTime());
+ incrementOutstandingTxnsIfNecessary();
+ incrementUncommittedMessageSize(message.getStoredMessage());
+ return enqueues;
}
private void resetUncommittedMessages()
@@ -1906,19 +1890,6 @@ public class ServerSession extends Sessi
public void closed(ServerSession ssn) {}
}
- private class CheckCapacityAction implements Action<MessageInstance>
- {
- @Override
- public void performAction(final MessageInstance entry)
- {
- TransactionLogResource queue = entry.getOwningResource();
- if(queue instanceof CapacityChecker)
- {
- ((CapacityChecker)queue).checkCapacity(_modelObject);
- }
- }
- }
-
private class ResultFuture<T> implements Future<T>
{
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Thu Feb 23 17:11:04 2017
@@ -76,7 +76,6 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.*;
-import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.protocol.ProtocolVersion;
import org.apache.qpid.server.protocol.v0_8.UnacknowledgedMessageMap.Visitor;
import org.apache.qpid.server.protocol.v0_8.transport.*;
@@ -177,7 +176,6 @@ public class AMQChannel extends Abstract
private final ClientDeliveryMethod _clientDeliveryMethod;
- private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
private final ImmediateAction _immediateAction = new ImmediateAction();
private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_0_8>> _consumers = new CopyOnWriteArrayList<>();
private Session<?> _modelObject;
@@ -462,7 +460,7 @@ public class AMQChannel extends Abstract
amqMessage.getInitialRoutingAddress(),
instanceProperties);
- int enqueues = result.send(_transaction, immediate ? _immediateAction : _capacityCheckAction);
+ int enqueues = result.send(_transaction, immediate ? _immediateAction : null);
if (enqueues == 0)
{
finallyAction = handleUnroutableMessage(amqMessage);
@@ -1414,8 +1412,6 @@ public class AMQChannel extends Abstract
@Override
public void performAction(MessageInstance entry)
{
- TransactionLogResource queue = entry.getOwningResource();
-
if (!entry.getDeliveredToConsumer() && entry.acquire())
{
@@ -1456,29 +1452,6 @@ public class AMQChannel extends Abstract
{
ref.release();
}
-
-
- }
- else
- {
- if(queue instanceof CapacityChecker)
- {
- ((CapacityChecker)queue).checkCapacity(AMQChannel.this);
- }
- }
-
- }
- }
-
- private final class CapacityCheckAction implements Action<MessageInstance>
- {
- @Override
- public void performAction(final MessageInstance entry)
- {
- TransactionLogResource queue = entry.getOwningResource();
- if(queue instanceof CapacityChecker)
- {
- ((CapacityChecker)queue).checkCapacity(AMQChannel.this);
}
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java Thu Feb 23 17:11:04 2017
@@ -106,23 +106,26 @@ public class NodeReceivingDestination im
}};
RoutingResult result = _destination.route(message, routingAddress, instanceProperties);
- if (result.isRoutingFailure())
+ int enqueues = result.send(txn, action);
+
+ if(enqueues == 0)
{
- return createdRejectedOutcome(AmqpError.valueOf(result.getErrorCodeAmqp_1_0()),
- result.getErrorMessage());
+ _eventLogger.message(ExchangeMessages.DISCARDMSG(_destination.getName(), routingAddress));
}
- else
- {
- int enqueues = result.send(txn, action);
- if(enqueues == 0)
+ if (enqueues == 0 && !_discardUnroutable)
+ {
+ if (result.hasNotAcceptingRoutableQueue())
{
- _eventLogger.message(ExchangeMessages.DISCARDMSG(_destination.getName(), routingAddress));
+ return createdRejectedOutcome(AmqpError.PRECONDITION_FAILED,
+ result.getUnacceptanceCause());
}
-
- return enqueues == 0 && !_discardUnroutable ?
- createdRejectedOutcome(AmqpError.NOT_FOUND,
- "Unknown destination '" + routingAddress + '"') : ACCEPTED;
+ return createdRejectedOutcome(AmqpError.NOT_FOUND,
+ "Unknown destination '" + routingAddress + '"');
+ }
+ else
+ {
+ return ACCEPTED;
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Thu Feb 23 17:11:04 2017
@@ -72,7 +72,6 @@ import org.apache.qpid.server.model.NotF
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
@@ -149,8 +148,6 @@ public class Session_1_0 extends Abstrac
private short _receivingChannel;
private final short _sendingChannel;
- private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
-
// has to be a power of two
private static final int DEFAULT_SESSION_BUFFER_SIZE = 1 << 11;
@@ -1888,11 +1885,6 @@ public class Session_1_0 extends Abstrac
_unacknowledgedMessages--;
}
- public CapacityCheckAction getCapacityCheckAction()
- {
- return _capacityCheckAction;
- }
-
@Override
public String toString()
{
@@ -2026,19 +2018,6 @@ public class Session_1_0 extends Abstrac
return primaryDomain;
}
- private final class CapacityCheckAction implements Action<MessageInstance>
- {
- @Override
- public void performAction(final MessageInstance entry)
- {
- TransactionLogResource queue = entry.getOwningResource();
- if(queue instanceof CapacityChecker)
- {
- ((CapacityChecker)queue).checkCapacity(Session_1_0.this);
- }
- }
- }
-
private final class BindingInfo
{
private final Map<Symbol, Filter> _actualFilters = new HashMap<>();
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java Thu Feb 23 17:11:04 2017
@@ -242,7 +242,7 @@ public class StandardReceivingLinkEndpoi
.checkAuthorizedMessagePrincipal(serverMessage.getMessageHeader().getUserId());
getReceivingDestination().authorizePublish(session.getSecurityToken(), routingAddress);
- Outcome outcome = getReceivingDestination().send(serverMessage, routingAddress, transaction, session.getCapacityCheckAction());
+ Outcome outcome = getReceivingDestination().send(serverMessage, routingAddress, transaction, null);
Source source = (Source) getSource();
DeliveryState resultantState;
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/AmqpError.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/AmqpError.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/AmqpError.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/AmqpError.java Thu Feb 23 17:11:04 2017
@@ -149,11 +149,6 @@ public class AmqpError
}
}
- public static AmqpError valueOf(String errorCode)
- {
- return valueOf(Symbol.valueOf(errorCode));
- }
-
public static AmqpError valueOf(Object obj)
{
Symbol val = (Symbol) obj;
Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/addQueue.html
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/addQueue.html?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/addQueue.html (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/addQueue.html Thu Feb 23 17:11:04 2017
@@ -137,49 +137,55 @@
</select>
</div>
</div>
- <div class="clear">
- <div class="formLabel-labelCell">Overflow policy:</div>
- <div class="formLabel-controlCell">
- <select id="formAddQueue.overflowPolicy"
- dojoType="dijit.form.FilteringSelect"
- data-dojo-props="
+
+ <div class="clear formBox">
+ <fieldset>
+ <legend>Overflow Settings</legend>
+ <div class="clear">
+ <div class="formLabel-labelCell">Overflow policy:</div>
+ <div class="formLabel-controlCell">
+ <select id="formAddQueue.overflowPolicy"
+ dojoType="dijit.form.FilteringSelect"
+ data-dojo-props="
name: 'overflowPolicy',
- value: '',
+ value: 'None',
searchAttr: 'name',
required: false,
- promptMessage: 'Overflow policy override. If not default, messages arriving will have overflow policy setting overridden',
- title: 'Enter overflow policy override'">
- <option value="NONE">None</option>
- <option value="RING">Ring</option>
- </select>
- </div>
- </div>
-
- <div id="formAddQueueOverflowPolicy:RING" class="hidden overflowPolicySpecificDiv">
- <div class="clear">
- <div class="formLabel-labelCell">Maximum count:</div>
- <div class="formLabel-controlCell">
- <input type="text" id="formAddQueueOverflowPolicy.maxCount"
- data-dojo-type="dijit/form/ValidationTextBox"
- data-dojo-props="
- name: 'maxCount',
- placeHolder: 'number of messages',
- promptMessage: 'Maximum number of messages in the queue',
- title: 'Enter the maximum number of messages in the queue',
- trim: true"/>
+ promptMessage: 'Select overflow policy to use',
+ title: 'Select overflow policy override'">
+ <option value="NONE">None</option>
+ <option value="RING">Ring</option>
+ <option value="PRODUCER_FLOW_CONTROL">Producer Flow Control</option>
+ </select>
+ </div>
</div>
- <div class="formLabel-labelCell">Maximum size:</div>
- <div class="formLabel-controlCell">
- <input type="text" id="formAddQueueoverflowPolicy.maxSize"
- data-dojo-type="dijit/form/ValidationTextBox"
- data-dojo-props="
- name: 'maxSize',
- placeHolder: 'size of messages',
- promptMessage: 'Maximum size of messages (including header) in the queue',
- title: 'Enter the maximum size of messages (including header) in the queue',
- trim: true"/>
+ <div class="clear">
+ <div class="formLabel-labelCell">Maximum Queue Depth (Messages):</div>
+ <div class="formLabel-controlCell">
+ <input type="text" id="formAddQueue.maximumQueueDepthMessages"
+ data-dojo-type="dijit/form/ValidationTextBox"
+ data-dojo-props="
+ name: 'maximumQueueDepthMessages',
+ placeHolder: 'maximum number of messages',
+ promptMessage: 'Maximum number of messages in the queue',
+ title: 'Enter the maximum number of messages in the queue',
+ trim: true"/>
+ </div>
</div>
- </div>
+ <div class="clear">
+ <div class="formLabel-labelCell">Maximum Queue Depth including headers (Bytes):</div>
+ <div class="formLabel-controlCell">
+ <input type="text" id="formAddQueue.maximumQueueDepthBytes"
+ data-dojo-type="dijit/form/ValidationTextBox"
+ data-dojo-props="
+ name: 'maximumQueueDepthBytes',
+ placeHolder: 'maximum number of bytes including headers',
+ promptMessage: 'Maximum number of bytes (including header) in the queue',
+ title: 'Enter the maximum number of bytes (including header) in the queue',
+ trim: true"/>
+ </div>
+ </div>
+ </fieldset>
<div class="clear"></div>
</div>
@@ -211,39 +217,6 @@
</div>
<div class="clear formBox">
- <fieldset>
- <legend>Flow Control Settings</legend>
- <div class="clear">
- <div class="formLabel-labelCell">Capacity:</div>
- <div class="formLabel-controlCell">
- <input type="text" id="formAddQueue.queueFlowControlSizeBytes"
- data-dojo-type="dijit/form/ValidationTextBox"
- data-dojo-props="
- name: 'queueFlowControlSizeBytes',
- placeHolder: 'size in bytes',
- promptMessage: 'Ceiling (in bytes) at which queue will begin to throttle sessions producing messages',
- title: 'Enter the ceiling (in bytes) at which queue will begin to throttle sessions producing messages',
- trim: true"/>
- </div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">Resume Capacity:</div>
- <div class="formLabel-controlCell">
- <input type="text" id="formAddQueue.queueFlowResumeSizeBytes"
- data-dojo-type="dijit/form/ValidationTextBox"
- data-dojo-props="
- name: 'queueFlowResumeSizeBytes',
- placeHolder: 'size in bytes',
- promptMessage: 'Floor (in bytes) at which queue will cease to throttle sessions producing messages',
- title: 'Enter the floor (in bytes) at which queue will cease to throttle sessions producing messages',
- trim: true"/>
- </div>
- </div>
- </fieldset>
- <div class="clear"></div>
- </div>
-
- <div class="clear formBox">
<fieldset>
<legend>Alerting Settings</legend>
<div class="clear">
Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/editQueue.html
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/editQueue.html?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/editQueue.html (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/editQueue.html Thu Feb 23 17:11:04 2017
@@ -131,6 +131,58 @@
</select>
</div>
</div>
+
+ <div class="clear formBox">
+ <fieldset>
+ <legend>Overflow Settings</legend>
+ <div class="clear">
+ <div class="formLabel-labelCell">Overflow policy:</div>
+ <div class="formLabel-controlCell">
+ <select id="formEditQueue.overflowPolicy"
+ dojoType="dijit.form.FilteringSelect"
+ data-dojo-props="
+ name: 'overflowPolicy',
+ value: 'None',
+ searchAttr: 'name',
+ required: false,
+ promptMessage: 'Select overflow policy to use',
+ title: 'Select overflow policy'">
+ <option value="NONE">None</option>
+ <option value="RING">Ring</option>
+ <option value="PRODUCER_FLOW_CONTROL">Producer Flow Control</option>
+ </select>
+ </div>
+ </div>
+ <div class="clear">
+ <div class="formLabel-labelCell">Maximum Queue Depth (Messages):</div>
+ <div class="formLabel-controlCell">
+ <input type="text" id="formEditQueue.maximumQueueDepthMessages"
+ data-dojo-type="dijit/form/ValidationTextBox"
+ data-dojo-props="
+ name: 'maximumQueueDepthMessages',
+ placeHolder: 'maximum number of messages',
+ promptMessage: 'Maximum number of messages in the queue',
+ title: 'Enter the maximum number of messages in the queue',
+ trim: true"/>
+ </div>
+ </div>
+ <div class="clear">
+ <div class="formLabel-labelCell">Maximum Queue Depth including headers (Bytes):</div>
+ <div class="formLabel-controlCell">
+ <input type="text" id="formEditQueue.maximumQueueDepthBytes"
+ data-dojo-type="dijit/form/ValidationTextBox"
+ data-dojo-props="
+ name: 'maximumQueueDepthBytes',
+ placeHolder: 'maximum number of bytes',
+ promptMessage: 'Maximum number of bytes (including header) in the queue',
+ title: 'Enter the maximum number of bytes (including header) in the queue',
+ trim: true"/>
+ </div>
+ </div>
+ </fieldset>
+ <div class="clear"></div>
+ </div>
+
<div class="clear">
<div class="formLabel-labelCell">Maximum Ttl:</div>
<div class="formLabel-controlCell">
@@ -162,39 +214,6 @@
<div class="clear formBox">
<fieldset>
- <legend>Flow Control Settings</legend>
- <div class="clear">
- <div class="formLabel-labelCell">Capacity:</div>
- <div class="formLabel-controlCell">
- <input type="text" id="formEditQueue.queueFlowControlSizeBytes"
- data-dojo-type="dijit/form/ValidationTextBox"
- data-dojo-props="
- name: 'queueFlowControlSizeBytes',
- placeHolder: 'size in bytes',
- promptMessage: 'Ceiling (in bytes) at which queue will begin to throttle sessions producing messages',
- title: 'Enter the ceiling (in bytes) at which queue will begin to throttle sessions producing messages',
- trim: true"/>
- </div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">Resume Capacity:</div>
- <div class="formLabel-controlCell">
- <input type="text" id="formEditQueue.queueFlowResumeSizeBytes"
- data-dojo-type="dijit/form/ValidationTextBox"
- data-dojo-props="
- name: 'queueFlowResumeSizeBytes',
- placeHolder: 'size in bytes',
- promptMessage: 'Floor (in bytes) at which queue will cease to throttle sessions producing messages',
- title: 'Enter the floor (in bytes) at which queue will cease to throttle sessions producing messages',
- trim: true"/>
- </div>
- </div>
- </fieldset>
- <div class="clear"></div>
- </div>
-
- <div class="clear formBox">
- <fieldset>
<legend>Alerting Settings</legend>
<div class="clear">
<div class="formLabel-labelCell">Queue Depth (Messages):</div>
Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js Thu Feb 23 17:11:04 2017
@@ -387,7 +387,8 @@ define(["dojo/_base/declare",
"owner",
"lifetimePolicy",
"overflowPolicy",
- "overflowPolicyQualifier",
+ "maximumQueueDepthBytes",
+ "maximumQueueDepthMessages",
"type",
"typeQualifier",
"alertRepeatGap",
@@ -418,8 +419,6 @@ define(["dojo/_base/declare",
"msgOutRate",
"bytesOutRate",
"bytesOutRateUnits",
- "queueFlowResumeSizeBytes",
- "queueFlowControlSizeBytes",
"maximumDeliveryAttempts",
"holdOnPublishEnabled",
"oldestMessageAge"]);
@@ -503,16 +502,8 @@ define(["dojo/_base/declare",
}
this["overflowPolicy"].innerHTML = entities.encode(this.queueData["overflowPolicy"]);
- if (this.queueData["overflowPolicy"] == "NONE")
- {
- this.overflowPolicyQualifier.style.display = "none";
- }
- else if (this.queueData["overflowPolicy"] == "RING")
- {
- bytesDepth = formatter.formatBytes(this.queueData["maxSize"]);
- this.overflowPolicyQualifier.innerHTML = "(Max count: " + entities.encode(String(this.queueData["maxCount"]))
- + ", Max Size: " + bytesDepth.value + " " + bytesDepth.units + ")";
- }
+ this["maximumQueueDepthBytes"].innerHTML = this.queueData["maximumQueueDepthBytes"] ? entities.encode(new String(this.queueData["maximumQueueDepthBytes"])) : "";
+ this["maximumQueueDepthMessages"].innerHTML = this.queueData["maximumQueueDepthMessages"] ? entities.encode(new String(this.queueData["maximumQueueDepthMessages"])) : "";
if (this.queueData["messageGroupKey"])
{
@@ -526,11 +517,6 @@ define(["dojo/_base/declare",
this.messageGroups.style.display = "none";
}
- this.queueFlowControlSizeBytes.innerHTML =
- entities.encode(String(this.queueData["queueFlowControlSizeBytes"]));
- this.queueFlowResumeSizeBytes.innerHTML =
- entities.encode(String(this.queueData["queueFlowResumeSizeBytes"]));
-
this.oldestMessageAge.innerHTML = entities.encode(String(this.queueData["oldestMessageAge"] / 1000));
var maximumDeliveryAttempts = this.queueData["maximumDeliveryAttempts"];
this.maximumDeliveryAttempts.innerHTML =
Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js Thu Feb 23 17:11:04 2017
@@ -61,8 +61,6 @@ define(["dojo/dom",
var numericFieldNames = ["maximumMessageTtl",
"minimumMessageTtl",
- "queueFlowControlSizeBytes",
- "queueFlowResumeSizeBytes",
"alertThresholdQueueDepthMessages",
"alertThresholdQueueDepthBytes",
"alertThresholdMessageAge",
@@ -103,27 +101,6 @@ define(["dojo/dom",
}
});
- var typeSelector = registry.byId("formAddQueue.overflowPolicy");
- typeSelector.on("change", function (value)
- {
- query(".overflowPolicySpecificDiv")
- .forEach(function (node, index, arr)
- {
- if (node.id === "formAddQueueOverflowPolicy:" + value)
- {
- node.style.display = "block";
- if (addQueue.management)
- {
- util.applyMetadataToWidgets(node, "Queue", value, addQueue.management.metadata);
- }
- }
- else
- {
- node.style.display = "none";
- }
- });
- });
-
theForm.on("submit", function (e)
{
@@ -173,6 +150,9 @@ define(["dojo/dom",
.set("regExpGen", util.numericOrContextVarRegexp);
}
+ registry.byId("formAddQueue.maximumQueueDepthBytes").set("regExpGen", util.signedOrContextVarRegexp);
+ registry.byId("formAddQueue.maximumQueueDepthMessages").set("regExpGen", util.signedOrContextVarRegexp);
+
if (!this.context)
{
this.context = new qpid.common.ContextVariablesEditor({
Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/editQueue.js
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/editQueue.js?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/editQueue.js (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/editQueue.js Thu Feb 23 17:11:04 2017
@@ -46,8 +46,6 @@ define(["dojox/html/entities",
var numericFieldNames = ["maximumMessageTtl",
"minimumMessageTtl",
- "queueFlowControlSizeBytes",
- "queueFlowResumeSizeBytes",
"alertThresholdQueueDepthMessages",
"alertThresholdQueueDepthBytes",
"alertThresholdMessageAge",
@@ -165,6 +163,9 @@ define(["dojox/html/entities",
.set("regExpGen", util.numericOrContextVarRegexp);
}
+ registry.byId("formEditQueue.maximumQueueDepthBytes").set("regExpGen", util.signedOrContextVarRegexp);
+ registry.byId("formEditQueue.maximumQueueDepthMessages").set("regExpGen", util.signedOrContextVarRegexp);
+
var queueType = this.typeSelector.get("value");
query(".typeSpecificDiv")
.forEach(function (node, index, arr)
Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/showQueue.html
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/showQueue.html?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/showQueue.html (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/showQueue.html Thu Feb 23 17:11:04 2017
@@ -49,9 +49,16 @@
<div class="messageDurability formValue-valueCell"></div>
</div>
<div class="clear">
- <div class="formLabel-labelCell">Overflow policy:</div>
+ <div class="formLabel-labelCell">Overflow Policy:</div>
<div class="overflowPolicy formValue-valueCell"></div>
- <div class="overflowPolicyQualifier formValue-valueCell"></div>
+ </div>
+ <div class="clear">
+ <div class="formLabel-labelCell">Maximum Queue Depth (Messages):</div>
+ <div class="maximumQueueDepthMessages formValue-valueCell"></div>
+ </div>
+ <div class="clear">
+ <div class="formLabel-labelCell">Maximum Queue Depth including headers (Bytes):</div>
+ <div class="maximumQueueDepthBytes formValue-valueCell"></div>
</div>
</div>
<div class="alignRight">
@@ -167,24 +174,6 @@
<button data-dojo-type="dijit.form.Button" class="copyMessagesButton" type="button">Copy Messages</button>
<button data-dojo-type="dijit.form.Button" class="refreshMessagesButton" type="button">Refresh</button>
</div>
- <br/>
- <div data-dojo-type="dijit.TitlePane" data-dojo-props="title: 'Flow Control Settings', open: false">
- <div class="clear">
- <div class="formLabel-labelCell">Capacity:</div>
- <div>
- <span class="queueFlowControlSizeBytes"></span>
- <span>B</span>
- </div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">Resume Capacity:</div>
- <div>
- <span class="queueFlowResumeSizeBytes"></span>
- <span>B</span>
- </div>
- </div>
- <div class="clear"></div>
- </div>
<br/>
<div data-dojo-type="dijit.TitlePane" data-dojo-props="title: 'Alerting Thresholds', open: false">
Modified: qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java (original)
+++ qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java Thu Feb 23 17:11:04 2017
@@ -148,10 +148,8 @@ public class Asserts
ExclusivityPolicy.NONE.name(), queueData.get(Queue.EXCLUSIVE));
assertEquals("Unexpected value of queue attribute " + Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0,
queueData.get(Queue.MAXIMUM_DELIVERY_ATTEMPTS));
- assertEquals("Unexpected value of queue attribute " + Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, 0,
- queueData.get(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES));
- assertEquals("Unexpected value of queue attribute " + Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, 0,
- queueData.get(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES));
+ assertEquals("Unexpected value of queue attribute " + Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, -1,
+ queueData.get(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES));
assertEquals("Unexpected value of queue attribute " + Queue.QUEUE_FLOW_STOPPED, Boolean.FALSE,
queueData.get(Queue.QUEUE_FLOW_STOPPED));
}
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java Thu Feb 23 17:11:04 2017
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -45,6 +46,7 @@ import org.apache.qpid.client.AMQSession
import org.apache.qpid.server.logging.AbstractTestLogging;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.OverflowPolicy;
import org.apache.qpid.systest.rest.RestTestHelper;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
@@ -249,9 +251,7 @@ public class ProducerFlowControlTest ext
//check current attribute values are 0 as expected
Map<String, Object> queueAttributes = _restTestHelper.getJsonAsSingletonList(queueUrl);
assertEquals("Capacity was not the expected value", 0,
- ((Number) queueAttributes.get(org.apache.qpid.server.model.Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES)).intValue());
- assertEquals("FlowResumeCapacity was not the expected value", 0,
- ((Number) queueAttributes.get(org.apache.qpid.server.model.Queue.QUEUE_FLOW_RESUME_SIZE_BYTES)).intValue());
+ ((Number) queueAttributes.get(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_BYTES)).intValue());
//set new values that will cause flow control to be active, and the queue to become overfull after 1 message is sent
setFlowLimits(queueUrl, 250, 250);
@@ -319,11 +319,19 @@ public class ProducerFlowControlTest ext
private void setFlowLimits(final String queueUrl, final int blockValue, final int resumeValue) throws IOException
{
final Map<String, Object> attributes = new HashMap<>();
- attributes.put(org.apache.qpid.server.model.Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, blockValue);
- attributes.put(org.apache.qpid.server.model.Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, resumeValue);
+ attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_BYTES, blockValue);
+ attributes.put(org.apache.qpid.server.model.Queue.OVERFLOW_POLICY, OverflowPolicy.PRODUCER_FLOW_CONTROL);
+ String resumeLimit = getFlowResumeLimit(blockValue, resumeValue);
+ Map<String, String> context = Collections.singletonMap(org.apache.qpid.server.model.Queue.QUEUE_FLOW_RESUME_LIMIT, resumeLimit);
+ attributes.put(org.apache.qpid.server.model.Queue.CONTEXT, context);
_restTestHelper.submitRequest(queueUrl, "PUT", attributes);
}
+ private String getFlowResumeLimit(final double blockValue, final double resumeValue)
+ {
+ return String.format("%.2f", resumeValue / blockValue * 100.0);
+ }
+
private boolean isFlowStopped(final String queueUrl) throws IOException
{
Map<String, Object> queueAttributes2 = _restTestHelper.getJsonAsSingletonList(queueUrl);
@@ -374,11 +382,18 @@ public class ProducerFlowControlTest ext
if(isBroker10())
{
final Map<String, Object> attributes = new HashMap<>();
- attributes.put(org.apache.qpid.server.model.Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, capacity);
- attributes.put(org.apache.qpid.server.model.Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, resumeCapacity);
+ if (capacity != 0)
+ {
+ attributes.put(org.apache.qpid.server.model.Queue.CONTEXT,
+ Collections.singletonMap(org.apache.qpid.server.model.Queue.QUEUE_FLOW_RESUME_LIMIT,
+ getFlowResumeLimit(capacity, resumeCapacity)));
+ }
+ attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_BYTES, capacity);
+ attributes.put(org.apache.qpid.server.model.Queue.OVERFLOW_POLICY, OverflowPolicy.PRODUCER_FLOW_CONTROL);
attributes.put(org.apache.qpid.server.model.Queue.DURABLE, durable);
attributes.put(ConfiguredObject.LIFETIME_POLICY, autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name() : LifetimePolicy.PERMANENT.name());
- createEntityUsingAmqpManagement(getTestQueueName(), session, "org.apache.qpid.Queue", attributes);
+ String queueUrl = String.format("queue/%1$s/%1$s/%2$s", TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, queueName);
+ _restTestHelper.submitRequest(queueUrl, "PUT", attributes, 201);
_queue = session.createQueue(queueName);
}
else
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java Thu Feb 23 17:11:04 2017
@@ -39,7 +39,6 @@ import com.fasterxml.jackson.databind.Ob
import org.apache.qpid.server.common.AMQPFilterTypes;
import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.model.BrokerTestHelper;
-import org.apache.qpid.server.model.OverflowPolicy;
import org.apache.qpid.server.protocol.v0_8.AMQShortString;
import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
@@ -107,7 +106,6 @@ public class VirtualHostMessageStoreTest
private String durablePriorityQueueName = "MST-PriorityQueue-Durable";
private String durableLastValueQueueName = "MST-LastValueQueue-Durable";
private String durableQueueName = "MST-Queue-Durable";
- private String durableQueueRingOverflowPolicy = "MST-Queue-Ring-OverflowPolicy";
private String priorityQueueName = "MST-PriorityQueue";
private String queueName = "MST-Queue";
@@ -237,7 +235,7 @@ public class VirtualHostMessageStoreTest
validateMessageOnTopics(2, true);
assertEquals("Not all queues correctly registered",
- 11, _virtualHost.getChildren(Queue.class).size());
+ 10, _virtualHost.getChildren(Queue.class).size());
}
public void testMessagePersistence() throws Exception
@@ -321,7 +319,7 @@ public class VirtualHostMessageStoreTest
public void testDurableQueueRemoval() throws Exception
{
//Register Durable Queue
- createQueue(durableQueueName, false, true, false, false, false);
+ createQueue(durableQueueName, false, true, false, false);
assertEquals("Incorrect number of queues registered before recovery",
1, _virtualHost.getChildren(Queue.class).size());
@@ -440,7 +438,7 @@ public class VirtualHostMessageStoreTest
//create durable queue and exchange, bind them
Exchange<?>
exch = createExchange(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, directExchangeName, true);
- createQueue(durableQueueName, false, true, false, false, false);
+ createQueue(durableQueueName, false, true, false, false);
bindQueueToExchange(exch, directRouting, _virtualHost.getChildByName(Queue.class, durableQueueName), false);
assertEquals("Incorrect number of bindings registered before recovery",
@@ -548,16 +546,15 @@ public class VirtualHostMessageStoreTest
private void validateDurableQueueProperties()
{
- validateQueueProperties(_virtualHost.getChildByName(Queue.class, durablePriorityQueueName), true, true, false, false, false);
- validateQueueProperties(_virtualHost.getChildByName(Queue.class, durablePriorityTopicQueueName), true, true, false, false, false);
- validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableQueueName), false, true, false, false, false);
- validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableTopicQueueName), false, true, false, false, false);
- validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableExclusiveQueueName), false, true, true, false, false);
- validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableLastValueQueueName), false, true, true, true, false);
- validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableQueueRingOverflowPolicy), false, true, false, false, true);
+ validateQueueProperties(_virtualHost.getChildByName(Queue.class, durablePriorityQueueName), true, true, false, false);
+ validateQueueProperties(_virtualHost.getChildByName(Queue.class, durablePriorityTopicQueueName), true, true, false, false);
+ validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableQueueName), false, true, false, false);
+ validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableTopicQueueName), false, true, false, false);
+ validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableExclusiveQueueName), false, true, true, false);
+ validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableLastValueQueueName), false, true, true, true);
}
- private void validateQueueProperties(Queue queue, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue, boolean ringPolicy)
+ private void validateQueueProperties(Queue queue, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue)
{
if(usePriority || lastValueQueue)
{
@@ -583,7 +580,6 @@ public class VirtualHostMessageStoreTest
assertEquals("Queue owner is not as expected for queue " + queue.getName(), exclusive ? queueOwner : null, queue.getOwner());
assertEquals("Queue durability is not as expected for queue " + queue.getName(), durable, queue.isDurable());
assertEquals("Queue exclusivity is not as expected for queue " + queue.getName(), exclusive, queue.getExclusive() != ExclusivityPolicy.NONE);
- assertEquals("Queue overflow policy is not as expected for queue " + queue.getName(), ringPolicy, queue.getOverflowPolicy() == OverflowPolicy.RING);
}
/**
@@ -628,43 +624,40 @@ public class VirtualHostMessageStoreTest
private void createAllQueues() throws Exception
{
//Register Durable Priority Queue
- createQueue(durablePriorityQueueName, true, true, false, false, false);
+ createQueue(durablePriorityQueueName, true, true, false, false);
//Register Durable Simple Queue
- createQueue(durableQueueName, false, true, false, false, false);
+ createQueue(durableQueueName, false, true, false, false);
//Register Durable Exclusive Simple Queue
- createQueue(durableExclusiveQueueName, false, true, true, false, false);
+ createQueue(durableExclusiveQueueName, false, true, true, false);
//Register Durable LastValue Queue
- createQueue(durableLastValueQueueName, false, true, true, true, false);
-
- //Register Durable Queue with Ring Overflow Policy
- createQueue(durableQueueRingOverflowPolicy, false, true, false, false, true);
+ createQueue(durableLastValueQueueName, false, true, true, true);
//Register NON-Durable Priority Queue
- createQueue(priorityQueueName, true, false, false, false, false);
+ createQueue(priorityQueueName, true, false, false, false);
//Register NON-Durable Simple Queue
- createQueue(queueName, false, false, false, false, false);
+ createQueue(queueName, false, false, false, false);
}
private void createAllTopicQueues() throws Exception
{
//Register Durable Priority Queue
- createQueue(durablePriorityTopicQueueName, true, true, false, false, false);
+ createQueue(durablePriorityTopicQueueName, true, true, false, false);
//Register Durable Simple Queue
- createQueue(durableTopicQueueName, false, true, false, false, false);
+ createQueue(durableTopicQueueName, false, true, false, false);
//Register NON-Durable Priority Queue
- createQueue(priorityTopicQueueName, true, false, false, false, false);
+ createQueue(priorityTopicQueueName, true, false, false, false);
//Register NON-Durable Simple Queue
- createQueue(topicQueueName, false, false, false, false, false);
+ createQueue(topicQueueName, false, false, false, false);
}
- private void createQueue(String queueName, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue, boolean ringPolicy)
+ private void createQueue(String queueName, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue)
throws Exception
{
@@ -710,7 +703,7 @@ public class VirtualHostMessageStoreTest
});
- validateQueueProperties(queue, usePriority, durable, exclusive, lastValueQueue, ringPolicy);
+ validateQueueProperties(queue, usePriority, durable, exclusive, lastValueQueue);
}
private Map<String, Exchange<?>> createExchanges() throws Exception
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/QueueRestTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/QueueRestTest.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/QueueRestTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/QueueRestTest.java Thu Feb 23 17:11:04 2017
@@ -116,8 +116,7 @@ public class QueueRestTest extends QpidR
attributes = new HashMap<String, Object>();
attributes.put(Queue.NAME, queueName);
- attributes.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, 100000);
- attributes.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, 80000);
+ attributes.put(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 100000);
attributes.put(Queue.ALERT_REPEAT_GAP, 10000);
attributes.put(Queue.ALERT_THRESHOLD_MESSAGE_AGE, 20000);
attributes.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, 30000);
@@ -129,8 +128,7 @@ public class QueueRestTest extends QpidR
assertEquals("Setting of queue attributes should be allowed", 200, responseCode);
Map<String, Object> queueData = getRestTestHelper().getJsonAsSingletonList(queueUrl);
- assertEquals("Unexpected " + Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, 100000, queueData.get(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES) );
- assertEquals("Unexpected " + Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, 80000, queueData.get(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES) );
+ assertEquals("Unexpected " + Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 100000, queueData.get(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES));
assertEquals("Unexpected " + Queue.ALERT_REPEAT_GAP, 10000, queueData.get(Queue.ALERT_REPEAT_GAP) );
assertEquals("Unexpected " + Queue.ALERT_THRESHOLD_MESSAGE_AGE, 20000, queueData.get(Queue.ALERT_THRESHOLD_MESSAGE_AGE) );
assertEquals("Unexpected " + Queue.ALERT_THRESHOLD_MESSAGE_SIZE, 30000, queueData.get(Queue.ALERT_THRESHOLD_MESSAGE_SIZE) );
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java Thu Feb 23 17:11:04 2017
@@ -640,8 +640,7 @@ public class VirtualHostRestTest extends
attributes.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, 1000000000);
attributes.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 800);
attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 15);
- attributes.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, 2000000000);
- attributes.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, 1500000000);
+ attributes.put(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 2000000000);
createQueue(queueName + "-standard", "standard", attributes);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org