You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/02/24 11:39:09 UTC

svn commit: r1784267 - in /qpid/java/trunk/broker-core/src: main/java/org/apache/qpid/server/queue/ test/java/org/apache/qpid/server/queue/

Author: orudyy
Date: Fri Feb 24 11:39:08 2017
New Revision: 1784267

URL: http://svn.apache.org/viewvc?rev=1784267&view=rev
Log:
QPID-7618: Simplify handling of overflow/underflow for producer flow control as suggested by Lorenz Quack

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java?rev=1784267&r1=1784266&r2=1784267&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java Fri Feb 24 11:39:08 2017
@@ -61,7 +61,7 @@ public class ProducerFlowControlOverflow
     {
         private final Queue<?> _queue;
         private final EventLogger _eventLogger;
-        private final AtomicBoolean _overfull = new AtomicBoolean(false);
+        private final AtomicBoolean _overfullReported = new AtomicBoolean(false);
         private final Set<AMQPSession<?, ?>> _blockedSessions =
                 Collections.newSetFromMap(new ConcurrentHashMap<AMQPSession<?, ?>, Boolean>());
         private volatile double _queueFlowResumeLimit;
@@ -132,7 +132,7 @@ public class ProducerFlowControlOverflow
                 _queue.removeChangeListener(this);
                 checkUnderfull(-1, -1);
 
-                if (_overfull.compareAndSet(true, false))
+                if (_overfullReported.compareAndSet(true, false))
                 {
                     _eventLogger.message(_queue.getLogSubject(),
                                          QueueMessages.UNDERFULL(_queue.getQueueDepthBytes(),
@@ -151,33 +151,30 @@ public class ProducerFlowControlOverflow
 
         boolean isQueueFlowStopped()
         {
-            return _overfull.get();
+            return _overfullReported.get();
         }
 
         private void checkUnderfull(long maximumQueueDepthBytes, long maximumQueueDepthMessages)
         {
-            if (_overfull.get())
-            {
-                long queueDepthBytes = _queue.getQueueDepthBytes();
-                long queueDepthMessages = _queue.getQueueDepthMessages();
+            long queueDepthBytes = _queue.getQueueDepthBytes();
+            long queueDepthMessages = _queue.getQueueDepthMessages();
 
-                if (isUnderfull(queueDepthBytes, maximumQueueDepthBytes)
-                    && isUnderfull(queueDepthMessages, maximumQueueDepthMessages))
+            if (isUnderfull(queueDepthBytes, maximumQueueDepthBytes)
+                && isUnderfull(queueDepthMessages, maximumQueueDepthMessages))
+            {
+                if (_overfullReported.compareAndSet(true, false))
                 {
-                    if (_overfull.compareAndSet(true, false))
-                    {
-                        _eventLogger.message(_queue.getLogSubject(),
-                                             QueueMessages.UNDERFULL(queueDepthBytes,
-                                                                     getFlowResumeLimit(maximumQueueDepthBytes),
-                                                                     queueDepthMessages,
-                                                                     getFlowResumeLimit(maximumQueueDepthMessages)));
-                    }
+                    _eventLogger.message(_queue.getLogSubject(),
+                                         QueueMessages.UNDERFULL(queueDepthBytes,
+                                                                 getFlowResumeLimit(maximumQueueDepthBytes),
+                                                                 queueDepthMessages,
+                                                                 getFlowResumeLimit(maximumQueueDepthMessages)));
+                }
 
-                    for (final AMQPSession<?, ?> blockedSession : _blockedSessions)
-                    {
-                        blockedSession.unblock(_queue);
-                        _blockedSessions.remove(blockedSession);
-                    }
+                for (final AMQPSession<?, ?> blockedSession : _blockedSessions)
+                {
+                    blockedSession.unblock(_queue);
+                    _blockedSessions.remove(blockedSession);
                 }
             }
         }
@@ -198,7 +195,7 @@ public class ProducerFlowControlOverflow
                     if (sessionPrincipal != null)
                     {
 
-                        if (_overfull.compareAndSet(false, true))
+                        if (_overfullReported.compareAndSet(false, true))
                         {
                             _eventLogger.message(_queue.getLogSubject(),
                                                  QueueMessages.OVERFULL(queueDepthBytes,
@@ -210,14 +207,6 @@ public class ProducerFlowControlOverflow
                         final AMQPSession<?, ?> session = sessionPrincipal.getSession();
                         _blockedSessions.add(session);
                         session.block(_queue);
-
-                        if (isUnderfull(queueDepthBytes, maximumQueueDepthBytes)
-                            && isUnderfull(queueDepthMessages, maximumQueueDepthMessages))
-                        {
-
-                            session.unblock(_queue);
-                            _blockedSessions.remove(session);
-                        }
                     }
                 }
             }

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java?rev=1784267&r1=1784266&r2=1784267&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java Fri Feb 24 11:39:08 2017
@@ -24,6 +24,7 @@ import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import java.security.PrivilegedAction;
@@ -70,7 +71,7 @@ public class ProducerFlowControlOverflow
         _producerFlowControlOverflowPolicyHandler = new ProducerFlowControlOverflowPolicyHandler(_queue, _eventLogger);
     }
 
-    public void testHandleOverflowBlocksOverfullBytes() throws Exception
+    public void testCheckOverflowBlocksSessionWhenOverfullBytes() throws Exception
     {
         AMQPSession<?, ?> session = mock(AMQPSession.class);
         when(_queue.getQueueDepthBytes()).thenReturn(11L);
@@ -81,19 +82,23 @@ public class ProducerFlowControlOverflow
         verify(session, times(1)).block(_queue);
         LogMessage logMessage = QueueMessages.OVERFULL(11, 10, 0, -1);
         verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(logMessage)));
+        verifyNoMoreInteractions(_eventLogger);
+        verifyNoMoreInteractions(session);
     }
 
-    public void testHandleOverflowBlocksOverfullMessages() throws Exception
+    public void testCheckOverflowBlocksSessionWhenOverfullMessages() throws Exception
     {
         AMQPSession<?, ?> session = mock(AMQPSession.class);
-
         when(_queue.getMaximumQueueDepthMessages()).thenReturn(10L);
         when(_queue.getQueueDepthMessages()).thenReturn(11);
+
         checkOverflow(session);
-        verify(session, times(1)).block(_queue);
 
+        verify(session, times(1)).block(_queue);
         LogMessage logMessage = QueueMessages.OVERFULL(0, -1, 11, 10);
         verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(logMessage)));
+        verifyNoMoreInteractions(_eventLogger);
+        verifyNoMoreInteractions(session);
     }
 
     public void testIsQueueFlowStopped() throws Exception
@@ -108,7 +113,7 @@ public class ProducerFlowControlOverflow
         assertTrue("Flow should be stopped", _producerFlowControlOverflowPolicyHandler.isQueueFlowStopped());
     }
 
-    public void testCheckCapacityResumesFlowForBytes() throws Exception
+    public void testCheckOverflowResumesFlowWhenUnderfullBytes() throws Exception
     {
         AMQPSession<?, ?> session = mock(AMQPSession.class);
         when(_queue.getQueueDepthBytes()).thenReturn(11L);
@@ -116,18 +121,24 @@ public class ProducerFlowControlOverflow
 
         checkOverflow(session);
 
+        verify(session, times(1)).block(_queue);
+        LogMessage overfullMessage = QueueMessages.OVERFULL(11, 10, 0, -1);
+        verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(overfullMessage)));
         assertTrue("Flow should be stopped", _producerFlowControlOverflowPolicyHandler.isQueueFlowStopped());
+
         when(_queue.getQueueDepthBytes()).thenReturn(8L);
 
         _producerFlowControlOverflowPolicyHandler.checkOverflow();
 
         verify(session, times(1)).unblock(_queue);
         assertFalse("Flow should not be stopped", _producerFlowControlOverflowPolicyHandler.isQueueFlowStopped());
-        LogMessage logMessage = QueueMessages.UNDERFULL(8, 8, 0, -1);
-        verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(logMessage)));
+        LogMessage underfullMessage = QueueMessages.UNDERFULL(8, 8, 0, -1);
+        verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(underfullMessage)));
+        verifyNoMoreInteractions(_eventLogger);
+        verifyNoMoreInteractions(session);
     }
 
-    public void testCheckCapacityResumesFlowForMessages() throws Exception
+    public void testCheckOverflowResumesFlowWhenUnderfullMessages() throws Exception
     {
         AMQPSession<?, ?> session = mock(AMQPSession.class);
         when(_queue.getQueueDepthMessages()).thenReturn(11);
@@ -135,15 +146,21 @@ public class ProducerFlowControlOverflow
 
         checkOverflow(session);
 
+        verify(session, times(1)).block(_queue);
+        LogMessage overfullMessage = QueueMessages.OVERFULL(0, -1, 11, 10);
+        verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(overfullMessage)));
         assertTrue("Flow should be stopped", _producerFlowControlOverflowPolicyHandler.isQueueFlowStopped());
+
         when(_queue.getQueueDepthMessages()).thenReturn(8);
 
         _producerFlowControlOverflowPolicyHandler.checkOverflow();
 
         verify(session, times(1)).unblock(_queue);
         assertFalse("Flow should not be stopped", _producerFlowControlOverflowPolicyHandler.isQueueFlowStopped());
-        LogMessage logMessage = QueueMessages.UNDERFULL(0, -1, 8, 8);
-        verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(logMessage)));
+        LogMessage underfullMessage = QueueMessages.UNDERFULL(0, -1, 8, 8);
+        verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(underfullMessage)));
+        verifyNoMoreInteractions(_eventLogger);
+        verifyNoMoreInteractions(session);
     }
 
     private void checkOverflow(AMQPSession<?, ?> session)

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java?rev=1784267&r1=1784266&r2=1784267&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java Fri Feb 24 11:39:08 2017
@@ -19,10 +19,13 @@
 
 package org.apache.qpid.server.queue;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import org.apache.qpid.server.logging.EventLogger;
@@ -61,7 +64,7 @@ public class RingOverflowPolicyHandlerTe
         _ringOverflowPolicyHandler = new RingOverflowPolicyHandler(_queue, _eventLogger);
     }
 
-    public void testHandleOverflowBytes() throws Exception
+    public void testCheckOverflowWhenOverfullBytes() throws Exception
     {
         QueueEntry lastEntry = createLastEntry();
         when(_queue.getLesserOldestEntry()).thenReturn(lastEntry, (QueueEntry) null);
@@ -72,12 +75,12 @@ public class RingOverflowPolicyHandlerTe
         _ringOverflowPolicyHandler.checkOverflow();
 
         verify(_queue).deleteEntry(lastEntry);
-
         LogMessage dropped = QueueMessages.DROPPED(1L, 4, 1, 5,-1);
         verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(dropped)));
+        verifyNoMoreInteractions(_eventLogger);
     }
 
-    public void testHandleOverflowMessages() throws Exception
+    public void testCheckOverflowWhenOverfullMessages() throws Exception
     {
         QueueEntry lastEntry = createLastEntry();
         when(_queue.getLesserOldestEntry()).thenReturn(lastEntry, (QueueEntry) null);
@@ -88,9 +91,33 @@ public class RingOverflowPolicyHandlerTe
         _ringOverflowPolicyHandler.checkOverflow();
 
         verify((AbstractQueue<?>) _queue).deleteEntry(lastEntry);
-
         LogMessage dropped = QueueMessages.DROPPED(1, 4, 5, -1,5);
         verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(dropped)));
+        verifyNoMoreInteractions(_eventLogger);
+    }
+
+    public void testCheckOverflowWhenUnderfullBytes() throws Exception
+    {
+        when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(5L);
+        when(_queue.getMaximumQueueDepthBytes()).thenReturn(5L);
+        when(_queue.getQueueDepthMessages()).thenReturn(3);
+
+        _ringOverflowPolicyHandler.checkOverflow();
+
+        verify(_queue, never()).deleteEntry(any(QueueEntry.class));
+        verifyNoMoreInteractions(_eventLogger);
+    }
+
+    public void testCheckOverflowWhenUnderfullMessages() throws Exception
+    {
+        when(_queue.getQueueDepthMessages()).thenReturn(5);
+        when(_queue.getMaximumQueueDepthMessages()).thenReturn(5L);
+        when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(10L);
+
+        _ringOverflowPolicyHandler.checkOverflow();
+
+        verify(_queue, never()).deleteEntry(any(QueueEntry.class));
+        verifyNoMoreInteractions(_eventLogger);
     }
 
     private QueueEntry createLastEntry()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org