You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ro...@apache.org on 2018/08/28 17:06:18 UTC
[1/2] activemq-artemis git commit: ARTEMIS-2057 Fix runaway credit
grants
Repository: activemq-artemis
Updated Branches:
refs/heads/master 1e26a8a2e -> 09f9159ea
ARTEMIS-2057 Fix runaway credit grants
Ensure the broker looks at local receiver credit when checking for
credit top off threshold and then do a proper top off back to the high
water mark to sync with how client receivers manage their credit.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/34254095
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/34254095
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/34254095
Branch: refs/heads/master
Commit: 34254095c88b2fa41a65af5eb359f862e4c34496
Parents: 1e26a8a
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Aug 28 12:15:01 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Aug 28 12:45:31 2018 -0400
----------------------------------------------------------------------
.../amqp/broker/AMQPSessionCallback.java | 7 +-
.../amqp/broker/AMQPSessionCallbackTest.java | 248 +++++++++++++++++++
.../tests/integration/amqp/AmqpSenderTest.java | 2 +-
3 files changed, 254 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34254095/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index f940c5a..b99a053 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -581,8 +581,11 @@ public class AMQPSessionCallback implements SessionCallback {
Runnable creditRunnable = () -> {
connection.lock();
try {
- if (receiver.getRemoteCredit() <= threshold) {
- receiver.flow(credits);
+ if (receiver.getCredit() <= threshold) {
+ int topUp = credits - receiver.getCredit();
+ if (topUp > 0) {
+ receiver.flow(topUp);
+ }
}
} finally {
connection.unlock();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34254095/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
new file mode 100644
index 0000000..e86e960
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_CREDITS_DEFAULT;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_LOW_CREDITS_DEFAULT;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.never;
+
+import java.util.concurrent.Executor;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.qpid.proton.engine.Receiver;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.mockito.quality.Strictness;
+
+public class AMQPSessionCallbackTest {
+
+ @Rule public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
+
+ @Mock private AMQPConnectionCallback protonSPI;
+ @Mock private ProtonProtocolManager manager;
+ @Mock private AMQPConnectionContext connection;
+ @Mock private Connection transportConnection;
+ @Mock private Executor executor;
+ @Mock private OperationContext operationContext;
+ @Mock private Receiver receiver;
+ @Mock private ActiveMQServer server;
+ @Mock private PagingManager pagingManager;
+ @Mock private PagingStore pagingStore;
+
+ /**
+ * Test that the AMQPSessionCallback grants no credit when not at threshold
+ */
+ @Test
+ public void testOfferProducerWithNoAddressDoesNotTopOffCreditAboveThreshold() {
+ // Mock returns to get at the runnable that grants credit.
+ Mockito.when(manager.getServer()).thenReturn(server);
+ Mockito.when(server.getPagingManager()).thenReturn(pagingManager);
+
+ // Capture credit runnable and invoke to trigger credit top off
+ ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
+ AMQPSessionCallback session = new AMQPSessionCallback(
+ protonSPI, manager, connection, transportConnection, executor, operationContext);
+
+ // Credit is above threshold
+ Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT + 1);
+
+ session.offerProducerCredit(null, AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver);
+
+ // Run the credit refill code.
+ Mockito.verify(pagingManager).checkMemory(argument.capture());
+ assertNotNull(argument.getValue());
+ argument.getValue().run();
+
+ // Ensure we aren't looking at remote credit as that gives us the wrong view of what credit is at the broker
+ Mockito.verify(receiver, never()).getRemoteCredit();
+
+ // Credit runnable should not top off credit to configured value
+ Mockito.verify(receiver, never()).flow(anyInt());
+ }
+
+ /**
+ * Test that when at threshold the manager tops off credit for anonymous sender
+ */
+ @Test
+ public void testOfferProducerWithNoAddressTopsOffCreditAtThreshold() {
+ // Mock returns to get at the runnable that grants credit.
+ Mockito.when(manager.getServer()).thenReturn(server);
+ Mockito.when(server.getPagingManager()).thenReturn(pagingManager);
+
+ // Capture credit runnable and invoke to trigger credit top off
+ ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
+ AMQPSessionCallback session = new AMQPSessionCallback(
+ protonSPI, manager, connection, transportConnection, executor, operationContext);
+
+ // Credit is at threshold
+ Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
+
+ session.offerProducerCredit(null, AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver);
+
+ // Run the credit refill code.
+ Mockito.verify(pagingManager).checkMemory(argument.capture());
+ assertNotNull(argument.getValue());
+ argument.getValue().run();
+
+ // Ensure we aren't looking at remote credit as that gives us the wrong view of what credit is at the broker
+ Mockito.verify(receiver, never()).getRemoteCredit();
+
+ // Credit runnable should top off credit to configured value
+ Mockito.verify(receiver).flow(AMQP_CREDITS_DEFAULT - AMQP_LOW_CREDITS_DEFAULT);
+ }
+
+ /**
+ * Test that the AMQPSessionCallback grants no credit when not at threshold
+ */
+ @Test
+ public void testOfferProducerWithAddressDoesNotTopOffCreditAboveThreshold() throws Exception {
+ // Mock returns to get at the runnable that grants credit.
+ Mockito.when(manager.getServer()).thenReturn(server);
+ Mockito.when(server.getPagingManager()).thenReturn(pagingManager);
+ Mockito.when(pagingManager.getPageStore(any(SimpleString.class))).thenReturn(pagingStore);
+
+ // Capture credit runnable and invoke to trigger credit top off
+ ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
+ AMQPSessionCallback session = new AMQPSessionCallback(
+ protonSPI, manager, connection, transportConnection, executor, operationContext);
+
+ // Credit is above threshold
+ Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT + 1);
+
+ session.offerProducerCredit(new SimpleString("test"), AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver);
+
+ // Run the credit refill code.
+ Mockito.verify(pagingStore).checkMemory(argument.capture());
+ assertNotNull(argument.getValue());
+ argument.getValue().run();
+
+ // Ensure we aren't looking at remote credit as that gives us the wrong view of what credit is at the broker
+ Mockito.verify(receiver, never()).getRemoteCredit();
+
+ // Credit runnable should not top off credit to configured value
+ Mockito.verify(receiver, never()).flow(anyInt());
+ }
+
+ /**
+ * Test that when at threshold the manager tops off credit for sender
+ */
+ @Test
+ public void testOfferProducerWithAddressTopsOffCreditAtThreshold() throws Exception {
+ // Mock returns to get at the runnable that grants credit.
+ Mockito.when(manager.getServer()).thenReturn(server);
+ Mockito.when(server.getPagingManager()).thenReturn(pagingManager);
+ Mockito.when(pagingManager.getPageStore(any(SimpleString.class))).thenReturn(pagingStore);
+
+ // Capture credit runnable and invoke to trigger credit top off
+ ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
+ AMQPSessionCallback session = new AMQPSessionCallback(
+ protonSPI, manager, connection, transportConnection, executor, operationContext);
+
+ // Credit is at threshold
+ Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
+
+ session.offerProducerCredit(new SimpleString("test"), AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver);
+
+ // Run the credit refill code.
+ Mockito.verify(pagingStore).checkMemory(argument.capture());
+ assertNotNull(argument.getValue());
+ argument.getValue().run();
+
+ // Ensure we aren't looking at remote credit as that gives us the wrong view of what credit is at the broker
+ Mockito.verify(receiver, never()).getRemoteCredit();
+
+ // Credit runnable should top off credit to configured value
+ Mockito.verify(receiver).flow(AMQP_CREDITS_DEFAULT - AMQP_LOW_CREDITS_DEFAULT);
+ }
+
+ /**
+ * Test that the AMQPSessionCallback grants no credit when the computation results in negative credit
+ */
+ @Test
+ public void testOfferProducerWithNoAddressDoesNotGrantNegativeCredit() {
+ // Mock returns to get at the runnable that grants credit.
+ Mockito.when(manager.getServer()).thenReturn(server);
+ Mockito.when(server.getPagingManager()).thenReturn(pagingManager);
+
+ // Capture credit runnable and invoke to trigger credit top off
+ ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
+ AMQPSessionCallback session = new AMQPSessionCallback(
+ protonSPI, manager, connection, transportConnection, executor, operationContext);
+
+ // Credit is at threshold
+ Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
+
+ session.offerProducerCredit(null, 1, AMQP_LOW_CREDITS_DEFAULT, receiver);
+
+ // Run the credit refill code.
+ Mockito.verify(pagingManager).checkMemory(argument.capture());
+ assertNotNull(argument.getValue());
+ argument.getValue().run();
+
+ // Ensure we aren't looking at remote credit as that gives us the wrong view of what credit is at the broker
+ Mockito.verify(receiver, never()).getRemoteCredit();
+
+ // Credit runnable should not grant what would be negative credit here
+ Mockito.verify(receiver, never()).flow(anyInt());
+ }
+
+ /**
+ * Test that the AMQPSessionCallback grants no credit when the computation results in negative credit
+ */
+ @Test
+ public void testOfferProducerWithAddressDoesNotGrantNegativeCredit() throws Exception {
+ // Mock returns to get at the runnable that grants credit.
+ Mockito.when(manager.getServer()).thenReturn(server);
+ Mockito.when(server.getPagingManager()).thenReturn(pagingManager);
+ Mockito.when(pagingManager.getPageStore(any(SimpleString.class))).thenReturn(pagingStore);
+
+ // Capture credit runnable and invoke to trigger credit top off
+ ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
+ AMQPSessionCallback session = new AMQPSessionCallback(
+ protonSPI, manager, connection, transportConnection, executor, operationContext);
+
+ // Credit is at threshold
+ Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
+
+ session.offerProducerCredit(new SimpleString("test"), 1, AMQP_LOW_CREDITS_DEFAULT, receiver);
+
+ // Run the credit refill code.
+ Mockito.verify(pagingStore).checkMemory(argument.capture());
+ assertNotNull(argument.getValue());
+ argument.getValue().run();
+
+ // Ensure we aren't looking at remote credit as that gives us the wrong view of what credit is at the broker
+ Mockito.verify(receiver, never()).getRemoteCredit();
+
+ // Credit runnable should not grant what would be negative credit here
+ Mockito.verify(receiver, never()).flow(anyInt());
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34254095/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java
index e10bc7d..50d8828 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java
@@ -199,7 +199,7 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
initialCredit.countDown();
break;
case 2:
- assertEquals("Unexpected replenished credit", AmqpSupport.AMQP_LOW_CREDITS_DEFAULT + AmqpSupport.AMQP_CREDITS_DEFAULT, sender.getCredit());
+ assertEquals("Unexpected replenished credit", AmqpSupport.AMQP_CREDITS_DEFAULT, sender.getCredit());
refreshedCredit.countDown();
break;
default:
[2/2] activemq-artemis git commit: ARTEMIS-2057: AMQP credit handling
update. This closes #2272.
Posted by ro...@apache.org.
ARTEMIS-2057: AMQP credit handling update. This closes #2272.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/09f9159e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/09f9159e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/09f9159e
Branch: refs/heads/master
Commit: 09f9159eae1a649fc77af845bfec1e2365238cdc
Parents: 1e26a8a 3425409
Author: Robbie Gemmell <ro...@apache.org>
Authored: Tue Aug 28 18:04:31 2018 +0100
Committer: Robbie Gemmell <ro...@apache.org>
Committed: Tue Aug 28 18:04:31 2018 +0100
----------------------------------------------------------------------
.../amqp/broker/AMQPSessionCallback.java | 7 +-
.../amqp/broker/AMQPSessionCallbackTest.java | 248 +++++++++++++++++++
.../tests/integration/amqp/AmqpSenderTest.java | 2 +-
3 files changed, 254 insertions(+), 3 deletions(-)
----------------------------------------------------------------------