You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/08/05 13:27:20 UTC

[activemq-artemis] branch main updated: ARTEMIS-3200 - remove braces from the belt and braces fix in ARTEMIS-2712, the braces are not necessary and leak, cleaning up in close negates the need to the session closeable

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new c2da0fd  ARTEMIS-3200 - remove braces from the belt and braces fix in ARTEMIS-2712, the braces are not necessary and leak, cleaning up in close negates the need to the session closeable
     new 969901a  This closes #3568
c2da0fd is described below

commit c2da0fd663c286f275ff090a1f87c16f70cb5653
Author: gtully <ga...@gmail.com>
AuthorDate: Wed May 5 12:39:51 2021 +0100

    ARTEMIS-3200 - remove braces from the belt and braces fix in ARTEMIS-2712, the braces are not necessary and leak, cleaning up in close negates the need to the session closeable
---
 .../protocol/amqp/broker/AMQPSessionCallback.java  | 11 ++++--
 .../amqp/proton/ProtonAbstractReceiver.java        | 13 +++----
 .../amqp/broker/AMQPSessionCallbackTest.java       | 17 +++++++++
 .../proton/ProtonServerReceiverContextTest.java    | 44 ++++++++++++++++++++++
 .../core/server/impl/ServerSessionImpl.java        |  5 +++
 .../tests/integration/amqp/AmqpSessionTest.java    | 28 ++++++++++++++
 6 files changed, 106 insertions(+), 12 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 82dbec7..2f5fa01 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -19,7 +19,6 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
 import java.util.Map;
 import java.util.concurrent.Executor;
 
-import org.apache.activemq.artemis.Closeable;
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -160,9 +159,6 @@ public class AMQPSessionCallback implements SessionCallback {
 
    }
 
-   public void addCloseable(Closeable closeable) {
-      serverSession.addCloseable(closeable);
-   }
 
    public void withinContext(Runnable run) throws Exception {
       OperationContext context = recoverContext();
@@ -434,6 +430,13 @@ public class AMQPSessionCallback implements SessionCallback {
       }
    }
 
+   @Override
+   public void close(boolean failed) {
+      if (protonSession != null) {
+         protonSession.close();
+      }
+   }
+
    public void ack(Transaction transaction, Object brokerConsumer, Message message) throws Exception {
       if (transaction == null) {
          transaction = serverSession.getCurrentTransaction();
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
index 74901b7..e0146c0 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
@@ -88,9 +88,6 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme
       this.creditRunnable = createCreditRunnable(amqpCredits, minCreditRefresh, receiver, connection, this);
       useModified = this.connection.getProtocolManager().isUseModifiedForTransientDeliveryErrors();
       this.routingContext = new RoutingContextImpl(null).setDuplicateDetection(connection.getProtocolManager().isAmqpDuplicateDetection());
-      if (sessionSPI != null) {
-         sessionSPI.addCloseable((boolean failed) -> clearLargeMessage());
-      }
    }
 
    protected void recoverContext() {
@@ -137,8 +134,8 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme
    }
    /**
     * The reason why we use the AtomicRunnable here
-    * is because PagingManager will call Runnables in case it was blocked.
-    * however it could call many Runnables
+    * is because PagingManager will call Runnable in case it was blocked.
+    * however it could call many Runnable
     *  and this serves as a control to avoid duplicated calls
     * */
    static class FlowControlRunner implements Runnable {
@@ -178,10 +175,10 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme
       }
    }
 
-   public int incrementSettle() {
+   public void incrementSettle() {
       assert pendingSettles >= 0;
       connection.requireInHandler();
-      return pendingSettles++;
+      pendingSettles++;
    }
 
    public void settle(Delivery settlement) {
@@ -289,13 +286,13 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme
    @Override
    public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
       protonSession.removeReceiver(receiver);
+      clearLargeMessage();
    }
 
    @Override
    public void close(ErrorCondition condition) throws ActiveMQAMQPException {
       receiver.setCondition(condition);
       close(false);
-      clearLargeMessage();
    }
 
    protected abstract void actualDelivery(AMQPMessage message, Delivery delivery, Receiver receiver, Transaction tx);
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
index 1184c13..a902a26 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.qpid.proton.engine.Receiver;
@@ -258,4 +259,20 @@ public class AMQPSessionCallbackTest {
       // Credit runnable should not grant what would be negative credit here
       Mockito.verify(receiver, never()).flow(anyInt());
    }
+
+   @Test
+   public void testCloseBoolCallsProtonSessionClose() throws Exception {
+      Mockito.reset(connection);
+      Mockito.when(manager.getServer()).thenReturn(server);
+
+      // Capture credit runnable and invoke to trigger credit top off
+      ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
+      AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext);
+
+      AMQPSessionContext protonSession = Mockito.mock(AMQPSessionContext.class);
+      session.init(protonSession, null);
+      session.close(false);
+
+      Mockito.verify(protonSession).close();
+   }
 }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
index 491651b..696f9c5 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
 import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
@@ -89,6 +90,49 @@ public class ProtonServerReceiverContextTest {
       doOnMessageWithDeliveryException(singletonList(Accepted.DESCRIPTOR_SYMBOL), null, new ActiveMQException(), Rejected.class);
    }
 
+   @Test
+   public void testClearLargeOnClose() throws Exception {
+      Receiver mockReceiver = mock(Receiver.class);
+      AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class);
+
+      when(mockConnContext.getAmqpCredits()).thenReturn(100);
+      when(mockConnContext.getAmqpLowCredits()).thenReturn(30);
+
+      when(mockConnContext.getProtocolManager()).thenReturn(mock(ProtonProtocolManager.class));
+
+      AMQPSessionCallback mockSessionSpi = mock(AMQPSessionCallback.class);
+      when(mockSessionSpi.getStorageManager()).thenReturn(new NullStorageManager());
+
+      AMQPSessionContext mockProtonContext = mock(AMQPSessionContext.class);
+
+      AtomicInteger clearLargeMessage = new AtomicInteger(0);
+      ProtonServerReceiverContext rc = new ProtonServerReceiverContext(mockSessionSpi, mockConnContext, mockProtonContext, mockReceiver) {
+         @Override
+         protected void clearLargeMessage() {
+            super.clearLargeMessage();
+            clearLargeMessage.incrementAndGet();
+         }
+      };
+
+      Delivery mockDelivery = mock(Delivery.class);
+      when(mockDelivery.isAborted()).thenReturn(false);
+      when(mockDelivery.isPartial()).thenReturn(false);
+      when(mockDelivery.getLink()).thenReturn(mockReceiver);
+
+      when(mockReceiver.current()).thenReturn(mockDelivery);
+
+      rc.onMessage(mockDelivery);
+
+      rc.close(true);
+
+      verify(mockReceiver, times(1)).current();
+      verify(mockReceiver, times(1)).advance();
+
+      Assert.assertTrue(clearLargeMessage.get() > 0);
+
+   }
+
+
    private void doOnMessageWithAbortedDeliveryTestImpl(boolean drain) throws ActiveMQAMQPException {
       Receiver mockReceiver = mock(Receiver.class);
       AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 8f7ed0e..dfc7771 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -305,6 +305,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       this.closeables.add(closeable);
    }
 
+   // for testing
+   public final Set<Closeable> getCloseables() {
+      return closeables;
+   }
+
    public Map<SimpleString, TempQueueCleanerUpper> getTempQueueCleanUppers() {
       return tempQueueCleannerUppers;
    }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java
index b16eafa..cd57ff0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java
@@ -16,13 +16,17 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.apache.activemq.transport.amqp.client.AmqpValidator;
 import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.engine.Session;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class AmqpSessionTest extends AmqpClientTestSupport {
@@ -71,4 +75,28 @@ public class AmqpSessionTest extends AmqpClientTestSupport {
       connection.getStateInspector().assertValid();
       connection.close();
    }
+
+
+   @Test(timeout = 60000)
+   public void testCreateSessionProducerConsumerDoesNotLeakClosable() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+      assertNotNull(session);
+
+      for (int i = 0; i < 10; i++) {
+         AmqpReceiver receiver = session.createReceiver(getQueueName());
+         AmqpSender sender = session.createSender(getQueueName());
+         receiver.close();
+         sender.close();
+      }
+
+      assertEquals(1, server.getSessions().size());
+      for (ServerSession serverSession : server.getSessions()) {
+         Assert.assertNull( ((ServerSessionImpl) serverSession).getCloseables());
+      }
+
+      connection.close();
+   }
+
 }