You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/02/24 11:39:09 UTC
svn commit: r1784267 - in /qpid/java/trunk/broker-core/src:
main/java/org/apache/qpid/server/queue/
test/java/org/apache/qpid/server/queue/
Author: orudyy
Date: Fri Feb 24 11:39:08 2017
New Revision: 1784267
URL: http://svn.apache.org/viewvc?rev=1784267&view=rev
Log:
QPID-7618: Simplify handling of overflow/underflow for producer flow control as suggested by Lorenz Quack
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java?rev=1784267&r1=1784266&r2=1784267&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java Fri Feb 24 11:39:08 2017
@@ -61,7 +61,7 @@ public class ProducerFlowControlOverflow
{
private final Queue<?> _queue;
private final EventLogger _eventLogger;
- private final AtomicBoolean _overfull = new AtomicBoolean(false);
+ private final AtomicBoolean _overfullReported = new AtomicBoolean(false);
private final Set<AMQPSession<?, ?>> _blockedSessions =
Collections.newSetFromMap(new ConcurrentHashMap<AMQPSession<?, ?>, Boolean>());
private volatile double _queueFlowResumeLimit;
@@ -132,7 +132,7 @@ public class ProducerFlowControlOverflow
_queue.removeChangeListener(this);
checkUnderfull(-1, -1);
- if (_overfull.compareAndSet(true, false))
+ if (_overfullReported.compareAndSet(true, false))
{
_eventLogger.message(_queue.getLogSubject(),
QueueMessages.UNDERFULL(_queue.getQueueDepthBytes(),
@@ -151,33 +151,30 @@ public class ProducerFlowControlOverflow
boolean isQueueFlowStopped()
{
- return _overfull.get();
+ return _overfullReported.get();
}
private void checkUnderfull(long maximumQueueDepthBytes, long maximumQueueDepthMessages)
{
- if (_overfull.get())
- {
- long queueDepthBytes = _queue.getQueueDepthBytes();
- long queueDepthMessages = _queue.getQueueDepthMessages();
+ long queueDepthBytes = _queue.getQueueDepthBytes();
+ long queueDepthMessages = _queue.getQueueDepthMessages();
- if (isUnderfull(queueDepthBytes, maximumQueueDepthBytes)
- && isUnderfull(queueDepthMessages, maximumQueueDepthMessages))
+ if (isUnderfull(queueDepthBytes, maximumQueueDepthBytes)
+ && isUnderfull(queueDepthMessages, maximumQueueDepthMessages))
+ {
+ if (_overfullReported.compareAndSet(true, false))
{
- if (_overfull.compareAndSet(true, false))
- {
- _eventLogger.message(_queue.getLogSubject(),
- QueueMessages.UNDERFULL(queueDepthBytes,
- getFlowResumeLimit(maximumQueueDepthBytes),
- queueDepthMessages,
- getFlowResumeLimit(maximumQueueDepthMessages)));
- }
+ _eventLogger.message(_queue.getLogSubject(),
+ QueueMessages.UNDERFULL(queueDepthBytes,
+ getFlowResumeLimit(maximumQueueDepthBytes),
+ queueDepthMessages,
+ getFlowResumeLimit(maximumQueueDepthMessages)));
+ }
- for (final AMQPSession<?, ?> blockedSession : _blockedSessions)
- {
- blockedSession.unblock(_queue);
- _blockedSessions.remove(blockedSession);
- }
+ for (final AMQPSession<?, ?> blockedSession : _blockedSessions)
+ {
+ blockedSession.unblock(_queue);
+ _blockedSessions.remove(blockedSession);
}
}
}
@@ -198,7 +195,7 @@ public class ProducerFlowControlOverflow
if (sessionPrincipal != null)
{
- if (_overfull.compareAndSet(false, true))
+ if (_overfullReported.compareAndSet(false, true))
{
_eventLogger.message(_queue.getLogSubject(),
QueueMessages.OVERFULL(queueDepthBytes,
@@ -210,14 +207,6 @@ public class ProducerFlowControlOverflow
final AMQPSession<?, ?> session = sessionPrincipal.getSession();
_blockedSessions.add(session);
session.block(_queue);
-
- if (isUnderfull(queueDepthBytes, maximumQueueDepthBytes)
- && isUnderfull(queueDepthMessages, maximumQueueDepthMessages))
- {
-
- session.unblock(_queue);
- _blockedSessions.remove(session);
- }
}
}
}
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java?rev=1784267&r1=1784266&r2=1784267&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java Fri Feb 24 11:39:08 2017
@@ -24,6 +24,7 @@ import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import java.security.PrivilegedAction;
@@ -70,7 +71,7 @@ public class ProducerFlowControlOverflow
_producerFlowControlOverflowPolicyHandler = new ProducerFlowControlOverflowPolicyHandler(_queue, _eventLogger);
}
- public void testHandleOverflowBlocksOverfullBytes() throws Exception
+ public void testCheckOverflowBlocksSessionWhenOverfullBytes() throws Exception
{
AMQPSession<?, ?> session = mock(AMQPSession.class);
when(_queue.getQueueDepthBytes()).thenReturn(11L);
@@ -81,19 +82,23 @@ public class ProducerFlowControlOverflow
verify(session, times(1)).block(_queue);
LogMessage logMessage = QueueMessages.OVERFULL(11, 10, 0, -1);
verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(logMessage)));
+ verifyNoMoreInteractions(_eventLogger);
+ verifyNoMoreInteractions(session);
}
- public void testHandleOverflowBlocksOverfullMessages() throws Exception
+ public void testCheckOverflowBlocksSessionWhenOverfullMessages() throws Exception
{
AMQPSession<?, ?> session = mock(AMQPSession.class);
-
when(_queue.getMaximumQueueDepthMessages()).thenReturn(10L);
when(_queue.getQueueDepthMessages()).thenReturn(11);
+
checkOverflow(session);
- verify(session, times(1)).block(_queue);
+ verify(session, times(1)).block(_queue);
LogMessage logMessage = QueueMessages.OVERFULL(0, -1, 11, 10);
verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(logMessage)));
+ verifyNoMoreInteractions(_eventLogger);
+ verifyNoMoreInteractions(session);
}
public void testIsQueueFlowStopped() throws Exception
@@ -108,7 +113,7 @@ public class ProducerFlowControlOverflow
assertTrue("Flow should be stopped", _producerFlowControlOverflowPolicyHandler.isQueueFlowStopped());
}
- public void testCheckCapacityResumesFlowForBytes() throws Exception
+ public void testCheckOverflowResumesFlowWhenUnderfullBytes() throws Exception
{
AMQPSession<?, ?> session = mock(AMQPSession.class);
when(_queue.getQueueDepthBytes()).thenReturn(11L);
@@ -116,18 +121,24 @@ public class ProducerFlowControlOverflow
checkOverflow(session);
+ verify(session, times(1)).block(_queue);
+ LogMessage overfullMessage = QueueMessages.OVERFULL(11, 10, 0, -1);
+ verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(overfullMessage)));
assertTrue("Flow should be stopped", _producerFlowControlOverflowPolicyHandler.isQueueFlowStopped());
+
when(_queue.getQueueDepthBytes()).thenReturn(8L);
_producerFlowControlOverflowPolicyHandler.checkOverflow();
verify(session, times(1)).unblock(_queue);
assertFalse("Flow should not be stopped", _producerFlowControlOverflowPolicyHandler.isQueueFlowStopped());
- LogMessage logMessage = QueueMessages.UNDERFULL(8, 8, 0, -1);
- verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(logMessage)));
+ LogMessage underfullMessage = QueueMessages.UNDERFULL(8, 8, 0, -1);
+ verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(underfullMessage)));
+ verifyNoMoreInteractions(_eventLogger);
+ verifyNoMoreInteractions(session);
}
- public void testCheckCapacityResumesFlowForMessages() throws Exception
+ public void testCheckOverflowResumesFlowWhenUnderfullMessages() throws Exception
{
AMQPSession<?, ?> session = mock(AMQPSession.class);
when(_queue.getQueueDepthMessages()).thenReturn(11);
@@ -135,15 +146,21 @@ public class ProducerFlowControlOverflow
checkOverflow(session);
+ verify(session, times(1)).block(_queue);
+ LogMessage overfullMessage = QueueMessages.OVERFULL(0, -1, 11, 10);
+ verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(overfullMessage)));
assertTrue("Flow should be stopped", _producerFlowControlOverflowPolicyHandler.isQueueFlowStopped());
+
when(_queue.getQueueDepthMessages()).thenReturn(8);
_producerFlowControlOverflowPolicyHandler.checkOverflow();
verify(session, times(1)).unblock(_queue);
assertFalse("Flow should not be stopped", _producerFlowControlOverflowPolicyHandler.isQueueFlowStopped());
- LogMessage logMessage = QueueMessages.UNDERFULL(0, -1, 8, 8);
- verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(logMessage)));
+ LogMessage underfullMessage = QueueMessages.UNDERFULL(0, -1, 8, 8);
+ verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(underfullMessage)));
+ verifyNoMoreInteractions(_eventLogger);
+ verifyNoMoreInteractions(session);
}
private void checkOverflow(AMQPSession<?, ?> session)
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java?rev=1784267&r1=1784266&r2=1784267&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java Fri Feb 24 11:39:08 2017
@@ -19,10 +19,13 @@
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import org.apache.qpid.server.logging.EventLogger;
@@ -61,7 +64,7 @@ public class RingOverflowPolicyHandlerTe
_ringOverflowPolicyHandler = new RingOverflowPolicyHandler(_queue, _eventLogger);
}
- public void testHandleOverflowBytes() throws Exception
+ public void testCheckOverflowWhenOverfullBytes() throws Exception
{
QueueEntry lastEntry = createLastEntry();
when(_queue.getLesserOldestEntry()).thenReturn(lastEntry, (QueueEntry) null);
@@ -72,12 +75,12 @@ public class RingOverflowPolicyHandlerTe
_ringOverflowPolicyHandler.checkOverflow();
verify(_queue).deleteEntry(lastEntry);
-
LogMessage dropped = QueueMessages.DROPPED(1L, 4, 1, 5,-1);
verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(dropped)));
+ verifyNoMoreInteractions(_eventLogger);
}
- public void testHandleOverflowMessages() throws Exception
+ public void testCheckOverflowWhenOverfullMessages() throws Exception
{
QueueEntry lastEntry = createLastEntry();
when(_queue.getLesserOldestEntry()).thenReturn(lastEntry, (QueueEntry) null);
@@ -88,9 +91,33 @@ public class RingOverflowPolicyHandlerTe
_ringOverflowPolicyHandler.checkOverflow();
verify((AbstractQueue<?>) _queue).deleteEntry(lastEntry);
-
LogMessage dropped = QueueMessages.DROPPED(1, 4, 5, -1,5);
verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(dropped)));
+ verifyNoMoreInteractions(_eventLogger);
+ }
+
+ public void testCheckOverflowWhenUnderfullBytes() throws Exception
+ {
+ when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(5L);
+ when(_queue.getMaximumQueueDepthBytes()).thenReturn(5L);
+ when(_queue.getQueueDepthMessages()).thenReturn(3);
+
+ _ringOverflowPolicyHandler.checkOverflow();
+
+ verify(_queue, never()).deleteEntry(any(QueueEntry.class));
+ verifyNoMoreInteractions(_eventLogger);
+ }
+
+ public void testCheckOverflowWhenUnderfullMessages() throws Exception
+ {
+ when(_queue.getQueueDepthMessages()).thenReturn(5);
+ when(_queue.getMaximumQueueDepthMessages()).thenReturn(5L);
+ when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(10L);
+
+ _ringOverflowPolicyHandler.checkOverflow();
+
+ verify(_queue, never()).deleteEntry(any(QueueEntry.class));
+ verifyNoMoreInteractions(_eventLogger);
}
private QueueEntry createLastEntry()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org