You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/08/05 13:27:20 UTC
[activemq-artemis] branch main updated: ARTEMIS-3200 - remove
braces from the belt and braces fix in ARTEMIS-2712,
the braces are not necessary and leak,
cleaning up in close negates the need to the session closeable
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new c2da0fd ARTEMIS-3200 - remove braces from the belt and braces fix in ARTEMIS-2712, the braces are not necessary and leak, cleaning up in close negates the need to the session closeable
new 969901a This closes #3568
c2da0fd is described below
commit c2da0fd663c286f275ff090a1f87c16f70cb5653
Author: gtully <ga...@gmail.com>
AuthorDate: Wed May 5 12:39:51 2021 +0100
ARTEMIS-3200 - remove braces from the belt and braces fix in ARTEMIS-2712, the braces are not necessary and leak, cleaning up in close negates the need to the session closeable
---
.../protocol/amqp/broker/AMQPSessionCallback.java | 11 ++++--
.../amqp/proton/ProtonAbstractReceiver.java | 13 +++----
.../amqp/broker/AMQPSessionCallbackTest.java | 17 +++++++++
.../proton/ProtonServerReceiverContextTest.java | 44 ++++++++++++++++++++++
.../core/server/impl/ServerSessionImpl.java | 5 +++
.../tests/integration/amqp/AmqpSessionTest.java | 28 ++++++++++++++
6 files changed, 106 insertions(+), 12 deletions(-)
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 82dbec7..2f5fa01 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -19,7 +19,6 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
import java.util.Map;
import java.util.concurrent.Executor;
-import org.apache.activemq.artemis.Closeable;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -160,9 +159,6 @@ public class AMQPSessionCallback implements SessionCallback {
}
- public void addCloseable(Closeable closeable) {
- serverSession.addCloseable(closeable);
- }
public void withinContext(Runnable run) throws Exception {
OperationContext context = recoverContext();
@@ -434,6 +430,13 @@ public class AMQPSessionCallback implements SessionCallback {
}
}
+ @Override
+ public void close(boolean failed) {
+ if (protonSession != null) {
+ protonSession.close();
+ }
+ }
+
public void ack(Transaction transaction, Object brokerConsumer, Message message) throws Exception {
if (transaction == null) {
transaction = serverSession.getCurrentTransaction();
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
index 74901b7..e0146c0 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
@@ -88,9 +88,6 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme
this.creditRunnable = createCreditRunnable(amqpCredits, minCreditRefresh, receiver, connection, this);
useModified = this.connection.getProtocolManager().isUseModifiedForTransientDeliveryErrors();
this.routingContext = new RoutingContextImpl(null).setDuplicateDetection(connection.getProtocolManager().isAmqpDuplicateDetection());
- if (sessionSPI != null) {
- sessionSPI.addCloseable((boolean failed) -> clearLargeMessage());
- }
}
protected void recoverContext() {
@@ -137,8 +134,8 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme
}
/**
* The reason why we use the AtomicRunnable here
- * is because PagingManager will call Runnables in case it was blocked.
- * however it could call many Runnables
+ * is because PagingManager will call Runnable in case it was blocked.
+ * however it could call many Runnable
* and this serves as a control to avoid duplicated calls
* */
static class FlowControlRunner implements Runnable {
@@ -178,10 +175,10 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme
}
}
- public int incrementSettle() {
+ public void incrementSettle() {
assert pendingSettles >= 0;
connection.requireInHandler();
- return pendingSettles++;
+ pendingSettles++;
}
public void settle(Delivery settlement) {
@@ -289,13 +286,13 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme
@Override
public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
protonSession.removeReceiver(receiver);
+ clearLargeMessage();
}
@Override
public void close(ErrorCondition condition) throws ActiveMQAMQPException {
receiver.setCondition(condition);
close(false);
- clearLargeMessage();
}
protected abstract void actualDelivery(AMQPMessage message, Delivery delivery, Receiver receiver, Transaction tx);
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
index 1184c13..a902a26 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.qpid.proton.engine.Receiver;
@@ -258,4 +259,20 @@ public class AMQPSessionCallbackTest {
// Credit runnable should not grant what would be negative credit here
Mockito.verify(receiver, never()).flow(anyInt());
}
+
+ @Test
+ public void testCloseBoolCallsProtonSessionClose() throws Exception {
+ Mockito.reset(connection);
+ Mockito.when(manager.getServer()).thenReturn(server);
+
+ // Capture credit runnable and invoke to trigger credit top off
+ ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
+ AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext);
+
+ AMQPSessionContext protonSession = Mockito.mock(AMQPSessionContext.class);
+ session.init(protonSession, null);
+ session.close(false);
+
+ Mockito.verify(protonSession).close();
+ }
}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
index 491651b..696f9c5 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
@@ -89,6 +90,49 @@ public class ProtonServerReceiverContextTest {
doOnMessageWithDeliveryException(singletonList(Accepted.DESCRIPTOR_SYMBOL), null, new ActiveMQException(), Rejected.class);
}
+ @Test
+ public void testClearLargeOnClose() throws Exception {
+ Receiver mockReceiver = mock(Receiver.class);
+ AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class);
+
+ when(mockConnContext.getAmqpCredits()).thenReturn(100);
+ when(mockConnContext.getAmqpLowCredits()).thenReturn(30);
+
+ when(mockConnContext.getProtocolManager()).thenReturn(mock(ProtonProtocolManager.class));
+
+ AMQPSessionCallback mockSessionSpi = mock(AMQPSessionCallback.class);
+ when(mockSessionSpi.getStorageManager()).thenReturn(new NullStorageManager());
+
+ AMQPSessionContext mockProtonContext = mock(AMQPSessionContext.class);
+
+ AtomicInteger clearLargeMessage = new AtomicInteger(0);
+ ProtonServerReceiverContext rc = new ProtonServerReceiverContext(mockSessionSpi, mockConnContext, mockProtonContext, mockReceiver) {
+ @Override
+ protected void clearLargeMessage() {
+ super.clearLargeMessage();
+ clearLargeMessage.incrementAndGet();
+ }
+ };
+
+ Delivery mockDelivery = mock(Delivery.class);
+ when(mockDelivery.isAborted()).thenReturn(false);
+ when(mockDelivery.isPartial()).thenReturn(false);
+ when(mockDelivery.getLink()).thenReturn(mockReceiver);
+
+ when(mockReceiver.current()).thenReturn(mockDelivery);
+
+ rc.onMessage(mockDelivery);
+
+ rc.close(true);
+
+ verify(mockReceiver, times(1)).current();
+ verify(mockReceiver, times(1)).advance();
+
+ Assert.assertTrue(clearLargeMessage.get() > 0);
+
+ }
+
+
private void doOnMessageWithAbortedDeliveryTestImpl(boolean drain) throws ActiveMQAMQPException {
Receiver mockReceiver = mock(Receiver.class);
AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 8f7ed0e..dfc7771 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -305,6 +305,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
this.closeables.add(closeable);
}
+ // for testing
+ public final Set<Closeable> getCloseables() {
+ return closeables;
+ }
+
public Map<SimpleString, TempQueueCleanerUpper> getTempQueueCleanUppers() {
return tempQueueCleannerUppers;
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java
index b16eafa..cd57ff0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java
@@ -16,13 +16,17 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Session;
+import org.junit.Assert;
import org.junit.Test;
public class AmqpSessionTest extends AmqpClientTestSupport {
@@ -71,4 +75,28 @@ public class AmqpSessionTest extends AmqpClientTestSupport {
connection.getStateInspector().assertValid();
connection.close();
}
+
+
+ @Test(timeout = 60000)
+ public void testCreateSessionProducerConsumerDoesNotLeakClosable() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+ assertNotNull(session);
+
+ for (int i = 0; i < 10; i++) {
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
+ AmqpSender sender = session.createSender(getQueueName());
+ receiver.close();
+ sender.close();
+ }
+
+ assertEquals(1, server.getSessions().size());
+ for (ServerSession serverSession : server.getSessions()) {
+ Assert.assertNull( ((ServerSessionImpl) serverSession).getCloseables());
+ }
+
+ connection.close();
+ }
+
}