You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/03/15 12:56:36 UTC

[activemq-artemis] branch main updated: ARTEMIS-3721 AMQP Mirrored Large Message file not removed

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 861fe59  ARTEMIS-3721 AMQP Mirrored Large Message file not removed
861fe59 is described below

commit 861fe59124732420dc9a66f292aa4ada968e103a
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Mon Mar 14 22:36:24 2022 -0400

    ARTEMIS-3721 AMQP Mirrored Large Message file not removed
---
 .../connect/mirror/AMQPMirrorControllerSource.java |  2 -
 .../integration/amqp/connect/BrokerInSyncTest.java | 92 ++++++++++++++++++++++
 2 files changed, 92 insertions(+), 2 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index b6d781d..6814aae 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -218,8 +218,6 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
          }
          snfQueue.refUp(ref);
          refs.add(ref);
-         message.usageUp();
-
 
          if (message.isDurable() && snfQueue.isDurable()) {
             PostOfficeImpl.storeDurableReference(server.getStorageManager(), message, context.getTransaction(), snfQueue, true);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
index 323fdfd..ade8bab 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
@@ -60,6 +60,20 @@ public class BrokerInSyncTest extends AmqpClientTestSupport {
    private static final Logger logger = Logger.getLogger(BrokerInSyncTest.class);
    ActiveMQServer server_2;
 
+   @After
+   public void stopServer1() throws Exception {
+      if (server != null) {
+         server.stop();
+      }
+   }
+
+   @After
+   public void stopServer2() throws Exception {
+      if (server_2 != null) {
+         server_2.stop();
+      }
+   }
+
    @Before
    public void startLogging() {
       AssertionLoggerHandler.startCapture();
@@ -422,4 +436,82 @@ public class BrokerInSyncTest extends AmqpClientTestSupport {
       server.stop();
    }
 
+   @Test
+   public void testLargeMessageInSync() throws Exception {
+      String queueName = "testSyncLargeMessage";
+      server.setIdentity("Server1");
+      {
+         AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
+         amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+      }
+      server.start();
+
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_2.setIdentity("Server2");
+
+      {
+         AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+         amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
+         server_2.getConfiguration().addAMQPConnection(amqpConnection);
+      }
+
+      server_2.start();
+
+      server_2.addAddressInfo(new AddressInfo(queueName).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
+      server_2.createQueue(new QueueConfiguration(queueName).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+
+      Wait.assertTrue(() -> server_2.locateQueue(queueName) != null);
+      Wait.assertTrue(() -> server.locateQueue(queueName) != null);
+
+      String bigString;
+      {
+         StringBuffer bigStringBuffer = new StringBuffer();
+         while (bigStringBuffer.length() < 200 * 1024) {
+            bigStringBuffer.append("This is a big string ");
+         }
+         bigString = bigStringBuffer.toString();
+      }
+
+      ConnectionFactory factory1 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+      ConnectionFactory factory2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2);
+
+      try (Connection connection = factory1.createConnection()) {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Queue queue = session.createQueue(queueName);
+         MessageProducer producer = session.createProducer(queue);
+         producer.send(session.createTextMessage(bigString));
+      }
+
+      try (Connection connection = factory2.createConnection()) {
+         org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(queueName);
+         Wait.assertEquals(1, serverQueue::getMessageCount);
+         Wait.assertEquals(1, () -> getNumberOfFiles(server_2.getConfiguration().getLargeMessagesLocation()), 5000, 100);
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Queue queue = session.createQueue(queueName);
+         MessageConsumer consumer = session.createConsumer(queue);
+         connection.start();
+         TextMessage message = (TextMessage)consumer.receive(5000);
+         Assert.assertNotNull(message);
+         Assert.assertEquals(bigString, message.getText());
+         Wait.assertEquals(0, () -> getNumberOfFiles(server_2.getConfiguration().getLargeMessagesLocation()), 5000, 100);
+      }
+
+      try (Connection connection = factory1.createConnection()) {
+         org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(queueName);
+
+         Wait.assertEquals(0, serverQueue::getMessageCount);
+
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Queue queue = session.createQueue(queueName);
+         MessageConsumer consumer = session.createConsumer(queue);
+         connection.start();
+         TextMessage message = (TextMessage)consumer.receiveNoWait();
+         Assert.assertNull(message);
+         server.stop();
+         server_2.stop();
+         Wait.assertEquals(0, () -> getNumberOfFiles(server.getConfiguration().getLargeMessagesLocation()), 1000, 100);
+      }
+   }
+
 }
\ No newline at end of file