You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/09/08 14:16:35 UTC

svn commit: r1759830 - in /qpid/java/trunk/broker-plugins: amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ amqp-0-8-protocol/src/test/java/org/apache/qpid/ser...

Author: lquack
Date: Thu Sep  8 14:16:35 2016
New Revision: 1759830

URL: http://svn.apache.org/viewvc?rev=1759830&view=rev
Log:
QPID-7387: [Java Broker] 0-8..0-91 Correct handling of consumer credit

Added:
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManagerTest.java
Modified:
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java?rev=1759830&r1=1759829&r2=1759830&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java Thu Sep  8 14:16:35 2016
@@ -75,7 +75,7 @@ public class CreditCreditManager extends
 
     }
 
-    public void clearCredit()
+    public synchronized void clearCredit()
     {
         _bytesCredit = 0l;
         _messageCredit = 0l;
@@ -145,18 +145,4 @@ public class CreditCreditManager extends
 
     }
 
-    public synchronized void stop()
-    {
-        if(_bytesCredit > 0)
-        {
-            _bytesCredit = 0;
-        }
-        if(_messageCredit > 0)
-        {
-            _messageCredit = 0;
-        }
-
-    }
-
-
 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java?rev=1759830&r1=1759829&r2=1759830&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java Thu Sep  8 14:16:35 2016
@@ -49,19 +49,19 @@ public class WindowCreditManager extends
 
     }
 
-    public long getMessageCreditLimit()
+    public synchronized long getMessageCreditLimit()
     {
         return _messageCreditLimit;
     }
 
-    public long getMessageCredit()
+    synchronized long getMessageCredit()
     {
          return _messageCreditLimit == -1L
                     ? Long.MAX_VALUE
                     : _messageUsed < _messageCreditLimit ? _messageCreditLimit - _messageUsed : 0L;
     }
 
-    public long getBytesCredit()
+    synchronized long getBytesCredit()
     {
         return _bytesCreditLimit == -1L
                     ? Long.MAX_VALUE
@@ -87,7 +87,7 @@ public class WindowCreditManager extends
         _bytesUsed -= bytesCredit;
         if(_bytesUsed < 0L)
         {
-            LOGGER.error("Bytes credit used value was negative: "+ _messageUsed);
+            LOGGER.error("Bytes credit used value was negative: "+ _bytesUsed);
             _bytesUsed = 0;
         }
 
@@ -192,7 +192,7 @@ public class WindowCreditManager extends
         }
     }
 
-    public void clearCredit()
+    public synchronized void clearCredit()
     {
         _bytesCreditLimit = 0l;
         _messageCreditLimit = 0l;

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java?rev=1759830&r1=1759829&r2=1759830&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java Thu Sep  8 14:16:35 2016
@@ -21,9 +21,9 @@
 package org.apache.qpid.server.protocol.v0_8;
 
 
-import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.flow.AbstractFlowCreditManager;
 import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.transport.ProtocolEngine;
 
 public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager
 {
@@ -47,82 +47,62 @@ public class Pre0_10CreditManager extend
     }
 
 
-
     public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit)
     {
         long bytesCreditChange = bytesCreditLimit - _bytesCreditLimit;
         long messageCreditChange = messageCreditLimit - _messageCreditLimit;
 
-
-
-        if(bytesCreditChange != 0L)
+        if (bytesCreditChange != 0L)
         {
-            if(bytesCreditLimit == 0L)
-            {
-                _bytesCredit = 0;
-            }
-            else
-            {
-                _bytesCredit += bytesCreditChange;
-            }
+            _bytesCredit += bytesCreditChange;
         }
 
-
-        if(messageCreditChange != 0L)
+        if (messageCreditChange != 0L)
         {
-            if(messageCreditLimit == 0L)
-            {
-                _messageCredit = 0;
-            }
-            else
-            {
-                _messageCredit += messageCreditChange;
-            }
+            _messageCredit += messageCreditChange;
         }
 
-
         _bytesCreditLimit = bytesCreditLimit;
         _messageCreditLimit = messageCreditLimit;
 
         setSuspended(!hasCredit());
-
     }
 
-
     public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
     {
-        final long messageCreditLimit = _messageCreditLimit;
-        boolean notifyIncrease = true;
-        if(messageCreditLimit != 0L)
+        final boolean hadCredit = hasCredit();
+
+        _messageCredit += messageCredit;
+        if (_messageCredit > _messageCreditLimit)
         {
-            notifyIncrease = (_messageCredit != 0);
-            long newCredit = _messageCredit + messageCredit;
-            _messageCredit = newCredit > messageCreditLimit ? messageCreditLimit : newCredit;
+            throw new IllegalStateException(String.format(
+                    "Consumer credit accounting error. Restored more credit than we ever had: messageCredit=%d  messageCreditLimit=%d",
+                    _messageCredit,
+                    _messageCreditLimit));
         }
 
-
-        final long bytesCreditLimit = _bytesCreditLimit;
-        if(bytesCreditLimit != 0L)
+        _bytesCredit += bytesCredit;
+        if (_bytesCredit > _bytesCreditLimit)
         {
-            long newCredit = _bytesCredit + bytesCredit;
-            _bytesCredit = newCredit > bytesCreditLimit ? bytesCreditLimit : newCredit;
-            if(notifyIncrease && bytesCredit>0)
-            {
-                notifyIncreaseBytesCredit();
-            }
+            throw new IllegalStateException(String.format(
+                    "Consumer credit accounting error. Restored more credit than we ever had: bytesCredit=%d  bytesCreditLimit=%d",
+                    _bytesCredit,
+                    _bytesCreditLimit));
         }
 
-
+        if (_bytesCreditLimit != 0 && _bytesCredit > 0 && bytesCredit > 0 && hadCredit)
+        {
+            notifyIncreaseBytesCredit();
+        }
 
         setSuspended(!hasCredit());
-
     }
 
     public synchronized boolean hasCredit()
     {
         return (_bytesCreditLimit == 0L || _bytesCredit > 0)
-                && (_messageCreditLimit == 0L || _messageCredit > 0)
-                && !_protocolEngine.isTransportBlockedForWriting();
+               && (_messageCreditLimit == 0L || _messageCredit > 0)
+               && !_protocolEngine.isTransportBlockedForWriting();
     }
 
     public synchronized boolean useCreditForMessage(final long msgSize)
@@ -132,59 +112,26 @@ public class Pre0_10CreditManager extend
             setSuspended(true);
             return false;
         }
-        else if(_messageCreditLimit != 0L)
+
+        if (_messageCreditLimit != 0)
         {
-            if(_messageCredit != 0L)
-            {
-                if(_bytesCreditLimit == 0L)
-                {
-                    _messageCredit--;
-
-                    return true;
-                }
-                else
-                {
-                    if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit))
-                    {
-                        _messageCredit--;
-                        _bytesCredit -= msgSize;
-
-                        return true;
-                    }
-                    else
-                    {
-                        return false;
-                    }
-                }
-            }
-            else
+            if (_messageCredit <= 0)
             {
                 setSuspended(true);
                 return false;
             }
         }
-        else
+        if (_bytesCreditLimit != 0)
         {
-            if(_bytesCreditLimit == 0L)
-            {
-
-                return true;
-            }
-            else
+            if ((_bytesCredit < msgSize) && (_bytesCredit != _bytesCreditLimit))
             {
-                if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit))
-                {
-                    _bytesCredit -= msgSize;
-
-                    return true;
-                }
-                else
-                {
-                    return false;
-                }
+                setSuspended(!hasCredit());
+                return false;
             }
-
         }
 
+        _messageCredit--;
+        _bytesCredit -= msgSize;
+        return true;
     }
 }

Added: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManagerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManagerTest.java?rev=1759830&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManagerTest.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManagerTest.java Thu Sep  8 14:16:35 2016
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.protocol.v0_8;
+
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.transport.ProtocolEngine;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class Pre0_10CreditManagerTest extends QpidTestCase
+{
+    private Pre0_10CreditManager _creditManager;
+    private ProtocolEngine _protocolEngine;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _protocolEngine = mock(ProtocolEngine.class);
+    }
+
+    public void testBasicMessageCredit() throws Exception
+    {
+        _creditManager = new Pre0_10CreditManager(0, 0, _protocolEngine);
+        _creditManager.setCreditLimits(0, 2);
+        assertTrue("Creditmanager should have credit", _creditManager.hasCredit());
+        assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(37));
+        assertTrue("Creditmanager should have credit", _creditManager.hasCredit());
+        assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(37));
+        assertFalse("Creditmanager should have credit", _creditManager.hasCredit());
+        assertFalse("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(37));
+        _creditManager.restoreCredit(1, 37);
+        assertTrue("Creditmanager should have credit", _creditManager.hasCredit());
+        assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(37));
+    }
+
+    public void testBytesLimitDoesNotPreventLargeMessage() throws Exception
+    {
+        _creditManager = new Pre0_10CreditManager(0, 0, _protocolEngine);
+        _creditManager.setCreditLimits(10, 0);
+        assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(3));
+        assertFalse("Creditmanager should not be able to useCredit", _creditManager.useCreditForMessage(30));
+        _creditManager.restoreCredit(1, 3);
+        assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(30));
+    }
+
+    public void testUseCreditWithNegativeMessageCredit() throws Exception
+    {
+        _creditManager = new Pre0_10CreditManager(0, 0, _protocolEngine);
+        _creditManager.setCreditLimits(0, 3);
+        assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(37));
+        assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(37));
+        assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(37));
+        _creditManager.setCreditLimits(0, 1); // This should get us to credit=-2
+        assertFalse("Creditmanager should not have credit", _creditManager.hasCredit());
+        assertFalse("Creditmanager should not be able to useCredit", _creditManager.useCreditForMessage(37));
+        _creditManager.restoreCredit(1, 37);
+        assertFalse("Creditmanager should not have credit", _creditManager.hasCredit());
+        _creditManager.restoreCredit(1, 37);
+        assertFalse("Creditmanager should not have credit", _creditManager.hasCredit());
+        _creditManager.restoreCredit(1, 37);
+        assertTrue("Creditmanager should have credit", _creditManager.hasCredit());
+    }
+
+    public void testUseCreditWithNegativeBytesCredit() throws Exception
+    {
+        _creditManager = new Pre0_10CreditManager(0, 0, _protocolEngine);
+        _creditManager.setCreditLimits(3, 0);
+        assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(1));
+        assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(1));
+        assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(1));
+        _creditManager.setCreditLimits(1, 0); // This should get us to credit=-2
+        assertFalse("Creditmanager should not have credit", _creditManager.hasCredit());
+        assertFalse("Creditmanager should not be able to useCredit", _creditManager.useCreditForMessage(1));
+        _creditManager.restoreCredit(1, 1);
+        assertFalse("Creditmanager should not have credit", _creditManager.hasCredit());
+        _creditManager.restoreCredit(1, 1);
+        assertFalse("Creditmanager should not have credit", _creditManager.hasCredit());
+        _creditManager.restoreCredit(1, 1);
+        assertTrue("Creditmanager should have credit", _creditManager.hasCredit());
+    }
+
+    public void testCreditAccountingWhileMessageLimitNotSet() throws Exception
+    {
+        _creditManager = new Pre0_10CreditManager(0, 0, _protocolEngine);
+        assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(37));
+        assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(37));
+        assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(37));
+        _creditManager.restoreCredit(1, 37);
+        _creditManager.setCreditLimits(37, 1); // This should get us to credit=-1
+        assertFalse("Creditmanager should not have credit", _creditManager.hasCredit());
+        assertFalse("Creditmanager should not be able to useCredit", _creditManager.useCreditForMessage(37));
+        _creditManager.restoreCredit(1, 37);
+        assertFalse("Creditmanager should not have credit", _creditManager.hasCredit());
+        _creditManager.restoreCredit(1, 37);
+        assertTrue("Creditmanager should have credit", _creditManager.hasCredit());
+    }
+
+    public void testMessageCreditExhaustionSuspends() throws Exception
+    {
+        _creditManager = new Pre0_10CreditManager(0, 0, _protocolEngine);
+        final FlowCreditManager.FlowCreditManagerListener flowListener =
+                mock(FlowCreditManager.FlowCreditManagerListener.class);
+        _creditManager.addStateListener(flowListener);
+        _creditManager.setCreditLimits(0, 1);
+        assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(37));
+        verify(flowListener, never()).creditStateChanged(anyBoolean());
+        assertFalse("Creditmanager should not be able to useCredit", _creditManager.useCreditForMessage(37));
+        verify(flowListener).creditStateChanged(false);
+        _creditManager.restoreCredit(1, 0);
+        verify(flowListener).creditStateChanged(true);
+    }
+
+    public void testBytesCreditExhaustionSuspends() throws Exception
+    {
+        _creditManager = new Pre0_10CreditManager(0, 0, _protocolEngine);
+        final FlowCreditManager.FlowCreditManagerListener flowListener =
+                mock(FlowCreditManager.FlowCreditManagerListener.class);
+        _creditManager.addStateListener(flowListener);
+        _creditManager.setCreditLimits(10, 0);
+        assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(7));
+        assertFalse("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(37));
+        verify(flowListener, never()).creditStateChanged(anyBoolean());
+        assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(3));
+        assertFalse("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(3));
+        verify(flowListener).creditStateChanged(false);
+        _creditManager.restoreCredit(1, 3);
+        verify(flowListener).creditStateChanged(true);
+    }
+
+    public void testNotifiedAfterBytesCreditIncreased() throws Exception
+    {
+        _creditManager = new Pre0_10CreditManager(0, 0, _protocolEngine);
+        final FlowCreditManager.FlowCreditManagerListener flowListener =
+                mock(FlowCreditManager.FlowCreditManagerListener.class);
+        _creditManager.addStateListener(flowListener);
+        _creditManager.setCreditLimits(10, 0);
+
+        assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(4));
+        assertTrue("Creditmanager should be able to useCredit", _creditManager.useCreditForMessage(4));
+        verify(flowListener, never()).creditStateChanged(anyBoolean());
+
+        _creditManager.restoreCredit(1, 4);
+        verify(flowListener).creditStateChanged(true);
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org