You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ro...@apache.org on 2018/08/28 17:06:18 UTC

[1/2] activemq-artemis git commit: ARTEMIS-2057 Fix runaway credit grants

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 1e26a8a2e -> 09f9159ea


ARTEMIS-2057 Fix runaway credit grants

Ensure the broker looks at local receiver credit when checking for
credit top off threshold and then do a proper top off back to the high
water mark to sync with how client receivers manage their credit.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/34254095
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/34254095
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/34254095

Branch: refs/heads/master
Commit: 34254095c88b2fa41a65af5eb359f862e4c34496
Parents: 1e26a8a
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Aug 28 12:15:01 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Aug 28 12:45:31 2018 -0400

----------------------------------------------------------------------
 .../amqp/broker/AMQPSessionCallback.java        |   7 +-
 .../amqp/broker/AMQPSessionCallbackTest.java    | 248 +++++++++++++++++++
 .../tests/integration/amqp/AmqpSenderTest.java  |   2 +-
 3 files changed, 254 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34254095/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index f940c5a..b99a053 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -581,8 +581,11 @@ public class AMQPSessionCallback implements SessionCallback {
          Runnable creditRunnable = () -> {
             connection.lock();
             try {
-               if (receiver.getRemoteCredit() <= threshold) {
-                  receiver.flow(credits);
+               if (receiver.getCredit() <= threshold) {
+                  int topUp = credits - receiver.getCredit();
+                  if (topUp > 0) {
+                     receiver.flow(topUp);
+                  }
                }
             } finally {
                connection.unlock();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34254095/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
new file mode 100644
index 0000000..e86e960
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_CREDITS_DEFAULT;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_LOW_CREDITS_DEFAULT;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.never;
+
+import java.util.concurrent.Executor;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.qpid.proton.engine.Receiver;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.mockito.quality.Strictness;
+
+public class AMQPSessionCallbackTest {
+
+   @Rule public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
+
+   @Mock private AMQPConnectionCallback protonSPI;
+   @Mock private ProtonProtocolManager manager;
+   @Mock private AMQPConnectionContext connection;
+   @Mock private Connection transportConnection;
+   @Mock private Executor executor;
+   @Mock private OperationContext operationContext;
+   @Mock private Receiver receiver;
+   @Mock private ActiveMQServer server;
+   @Mock private PagingManager pagingManager;
+   @Mock private PagingStore pagingStore;
+
+   /**
+    * Test that the AMQPSessionCallback grants no credit when not at threshold
+    */
+   @Test
+   public void testOfferProducerWithNoAddressDoesNotTopOffCreditAboveThreshold() {
+      // Mock returns to get at the runnable that grants credit.
+      Mockito.when(manager.getServer()).thenReturn(server);
+      Mockito.when(server.getPagingManager()).thenReturn(pagingManager);
+
+      // Capture credit runnable and invoke to trigger credit top off
+      ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
+      AMQPSessionCallback session = new AMQPSessionCallback(
+         protonSPI, manager, connection, transportConnection, executor, operationContext);
+
+      // Credit is above threshold
+      Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT + 1);
+
+      session.offerProducerCredit(null, AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver);
+
+      // Run the credit refill code.
+      Mockito.verify(pagingManager).checkMemory(argument.capture());
+      assertNotNull(argument.getValue());
+      argument.getValue().run();
+
+      // Ensure we aren't looking at remote credit as that gives us the wrong view of what credit is at the broker
+      Mockito.verify(receiver, never()).getRemoteCredit();
+
+      // Credit runnable should not top off credit to configured value
+      Mockito.verify(receiver, never()).flow(anyInt());
+   }
+
+   /**
+    * Test that when at threshold the manager tops off credit for anonymous sender
+    */
+   @Test
+   public void testOfferProducerWithNoAddressTopsOffCreditAtThreshold() {
+      // Mock returns to get at the runnable that grants credit.
+      Mockito.when(manager.getServer()).thenReturn(server);
+      Mockito.when(server.getPagingManager()).thenReturn(pagingManager);
+
+      // Capture credit runnable and invoke to trigger credit top off
+      ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
+      AMQPSessionCallback session = new AMQPSessionCallback(
+         protonSPI, manager, connection, transportConnection, executor, operationContext);
+
+      // Credit is at threshold
+      Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
+
+      session.offerProducerCredit(null, AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver);
+
+      // Run the credit refill code.
+      Mockito.verify(pagingManager).checkMemory(argument.capture());
+      assertNotNull(argument.getValue());
+      argument.getValue().run();
+
+      // Ensure we aren't looking at remote credit as that gives us the wrong view of what credit is at the broker
+      Mockito.verify(receiver, never()).getRemoteCredit();
+
+      // Credit runnable should top off credit to configured value
+      Mockito.verify(receiver).flow(AMQP_CREDITS_DEFAULT - AMQP_LOW_CREDITS_DEFAULT);
+   }
+
+   /**
+    * Test that the AMQPSessionCallback grants no credit when not at threshold
+    */
+   @Test
+   public void testOfferProducerWithAddressDoesNotTopOffCreditAboveThreshold() throws Exception {
+      // Mock returns to get at the runnable that grants credit.
+      Mockito.when(manager.getServer()).thenReturn(server);
+      Mockito.when(server.getPagingManager()).thenReturn(pagingManager);
+      Mockito.when(pagingManager.getPageStore(any(SimpleString.class))).thenReturn(pagingStore);
+
+      // Capture credit runnable and invoke to trigger credit top off
+      ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
+      AMQPSessionCallback session = new AMQPSessionCallback(
+         protonSPI, manager, connection, transportConnection, executor, operationContext);
+
+      // Credit is above threshold
+      Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT + 1);
+
+      session.offerProducerCredit(new SimpleString("test"), AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver);
+
+      // Run the credit refill code.
+      Mockito.verify(pagingStore).checkMemory(argument.capture());
+      assertNotNull(argument.getValue());
+      argument.getValue().run();
+
+      // Ensure we aren't looking at remote credit as that gives us the wrong view of what credit is at the broker
+      Mockito.verify(receiver, never()).getRemoteCredit();
+
+      // Credit runnable should not top off credit to configured value
+      Mockito.verify(receiver, never()).flow(anyInt());
+   }
+
+   /**
+    * Test that when at threshold the manager tops off credit for sender
+    */
+   @Test
+   public void testOfferProducerWithAddressTopsOffCreditAtThreshold() throws Exception {
+      // Mock returns to get at the runnable that grants credit.
+      Mockito.when(manager.getServer()).thenReturn(server);
+      Mockito.when(server.getPagingManager()).thenReturn(pagingManager);
+      Mockito.when(pagingManager.getPageStore(any(SimpleString.class))).thenReturn(pagingStore);
+
+      // Capture credit runnable and invoke to trigger credit top off
+      ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
+      AMQPSessionCallback session = new AMQPSessionCallback(
+         protonSPI, manager, connection, transportConnection, executor, operationContext);
+
+      // Credit is at threshold
+      Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
+
+      session.offerProducerCredit(new SimpleString("test"), AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver);
+
+      // Run the credit refill code.
+      Mockito.verify(pagingStore).checkMemory(argument.capture());
+      assertNotNull(argument.getValue());
+      argument.getValue().run();
+
+      // Ensure we aren't looking at remote credit as that gives us the wrong view of what credit is at the broker
+      Mockito.verify(receiver, never()).getRemoteCredit();
+
+      // Credit runnable should top off credit to configured value
+      Mockito.verify(receiver).flow(AMQP_CREDITS_DEFAULT - AMQP_LOW_CREDITS_DEFAULT);
+   }
+
+   /**
+    * Test that the AMQPSessionCallback grants no credit when the computation results in negative credit
+    */
+   @Test
+   public void testOfferProducerWithNoAddressDoesNotGrantNegativeCredit() {
+      // Mock returns to get at the runnable that grants credit.
+      Mockito.when(manager.getServer()).thenReturn(server);
+      Mockito.when(server.getPagingManager()).thenReturn(pagingManager);
+
+      // Capture credit runnable and invoke to trigger credit top off
+      ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
+      AMQPSessionCallback session = new AMQPSessionCallback(
+         protonSPI, manager, connection, transportConnection, executor, operationContext);
+
+      // Credit is at threshold
+      Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
+
+      session.offerProducerCredit(null, 1, AMQP_LOW_CREDITS_DEFAULT, receiver);
+
+      // Run the credit refill code.
+      Mockito.verify(pagingManager).checkMemory(argument.capture());
+      assertNotNull(argument.getValue());
+      argument.getValue().run();
+
+      // Ensure we aren't looking at remote credit as that gives us the wrong view of what credit is at the broker
+      Mockito.verify(receiver, never()).getRemoteCredit();
+
+      // Credit runnable should not grant what would be negative credit here
+      Mockito.verify(receiver, never()).flow(anyInt());
+   }
+
+   /**
+    * Test that the AMQPSessionCallback grants no credit when the computation results in negative credit
+    */
+   @Test
+   public void testOfferProducerWithAddressDoesNotGrantNegativeCredit() throws Exception {
+      // Mock returns to get at the runnable that grants credit.
+      Mockito.when(manager.getServer()).thenReturn(server);
+      Mockito.when(server.getPagingManager()).thenReturn(pagingManager);
+      Mockito.when(pagingManager.getPageStore(any(SimpleString.class))).thenReturn(pagingStore);
+
+      // Capture credit runnable and invoke to trigger credit top off
+      ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
+      AMQPSessionCallback session = new AMQPSessionCallback(
+         protonSPI, manager, connection, transportConnection, executor, operationContext);
+
+      // Credit is at threshold
+      Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
+
+      session.offerProducerCredit(new SimpleString("test"), 1, AMQP_LOW_CREDITS_DEFAULT, receiver);
+
+      // Run the credit refill code.
+      Mockito.verify(pagingStore).checkMemory(argument.capture());
+      assertNotNull(argument.getValue());
+      argument.getValue().run();
+
+      // Ensure we aren't looking at remote credit as that gives us the wrong view of what credit is at the broker
+      Mockito.verify(receiver, never()).getRemoteCredit();
+
+      // Credit runnable should not grant what would be negative credit here
+      Mockito.verify(receiver, never()).flow(anyInt());
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34254095/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java
index e10bc7d..50d8828 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java
@@ -199,7 +199,7 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
                   initialCredit.countDown();
                   break;
                case 2:
-                  assertEquals("Unexpected replenished credit", AmqpSupport.AMQP_LOW_CREDITS_DEFAULT + AmqpSupport.AMQP_CREDITS_DEFAULT, sender.getCredit());
+                  assertEquals("Unexpected replenished credit", AmqpSupport.AMQP_CREDITS_DEFAULT, sender.getCredit());
                   refreshedCredit.countDown();
                   break;
                default:


[2/2] activemq-artemis git commit: ARTEMIS-2057: AMQP credit handling update. This closes #2272.

Posted by ro...@apache.org.
ARTEMIS-2057: AMQP credit handling update. This closes #2272.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/09f9159e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/09f9159e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/09f9159e

Branch: refs/heads/master
Commit: 09f9159eae1a649fc77af845bfec1e2365238cdc
Parents: 1e26a8a 3425409
Author: Robbie Gemmell <ro...@apache.org>
Authored: Tue Aug 28 18:04:31 2018 +0100
Committer: Robbie Gemmell <ro...@apache.org>
Committed: Tue Aug 28 18:04:31 2018 +0100

----------------------------------------------------------------------
 .../amqp/broker/AMQPSessionCallback.java        |   7 +-
 .../amqp/broker/AMQPSessionCallbackTest.java    | 248 +++++++++++++++++++
 .../tests/integration/amqp/AmqpSenderTest.java  |   2 +-
 3 files changed, 254 insertions(+), 3 deletions(-)
----------------------------------------------------------------------