You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/03/15 12:56:36 UTC
[activemq-artemis] branch main updated: ARTEMIS-3721 AMQP Mirrored Large Message file not removed
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 861fe59 ARTEMIS-3721 AMQP Mirrored Large Message file not removed
861fe59 is described below
commit 861fe59124732420dc9a66f292aa4ada968e103a
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Mon Mar 14 22:36:24 2022 -0400
ARTEMIS-3721 AMQP Mirrored Large Message file not removed
---
.../connect/mirror/AMQPMirrorControllerSource.java | 2 -
.../integration/amqp/connect/BrokerInSyncTest.java | 92 ++++++++++++++++++++++
2 files changed, 92 insertions(+), 2 deletions(-)
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index b6d781d..6814aae 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -218,8 +218,6 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
}
snfQueue.refUp(ref);
refs.add(ref);
- message.usageUp();
-
if (message.isDurable() && snfQueue.isDurable()) {
PostOfficeImpl.storeDurableReference(server.getStorageManager(), message, context.getTransaction(), snfQueue, true);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
index 323fdfd..ade8bab 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
@@ -60,6 +60,20 @@ public class BrokerInSyncTest extends AmqpClientTestSupport {
private static final Logger logger = Logger.getLogger(BrokerInSyncTest.class);
ActiveMQServer server_2;
+ @After
+ public void stopServer1() throws Exception {
+ if (server != null) {
+ server.stop();
+ }
+ }
+
+ @After
+ public void stopServer2() throws Exception {
+ if (server_2 != null) {
+ server_2.stop();
+ }
+ }
+
@Before
public void startLogging() {
AssertionLoggerHandler.startCapture();
@@ -422,4 +436,82 @@ public class BrokerInSyncTest extends AmqpClientTestSupport {
server.stop();
}
+ @Test
+ public void testLargeMessageInSync() throws Exception {
+ String queueName = "testSyncLargeMessage";
+ server.setIdentity("Server1");
+ {
+ AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
+ amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ }
+ server.start();
+
+ server_2 = createServer(AMQP_PORT_2, false);
+ server_2.setIdentity("Server2");
+
+ {
+ AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+ amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
+ server_2.getConfiguration().addAMQPConnection(amqpConnection);
+ }
+
+ server_2.start();
+
+ server_2.addAddressInfo(new AddressInfo(queueName).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
+ server_2.createQueue(new QueueConfiguration(queueName).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+
+ Wait.assertTrue(() -> server_2.locateQueue(queueName) != null);
+ Wait.assertTrue(() -> server.locateQueue(queueName) != null);
+
+ String bigString;
+ {
+ StringBuffer bigStringBuffer = new StringBuffer();
+ while (bigStringBuffer.length() < 200 * 1024) {
+ bigStringBuffer.append("This is a big string ");
+ }
+ bigString = bigStringBuffer.toString();
+ }
+
+ ConnectionFactory factory1 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+ ConnectionFactory factory2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2);
+
+ try (Connection connection = factory1.createConnection()) {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(queueName);
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage(bigString));
+ }
+
+ try (Connection connection = factory2.createConnection()) {
+ org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(queueName);
+ Wait.assertEquals(1, serverQueue::getMessageCount);
+ Wait.assertEquals(1, () -> getNumberOfFiles(server_2.getConfiguration().getLargeMessagesLocation()), 5000, 100);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(queueName);
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+ TextMessage message = (TextMessage)consumer.receive(5000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals(bigString, message.getText());
+ Wait.assertEquals(0, () -> getNumberOfFiles(server_2.getConfiguration().getLargeMessagesLocation()), 5000, 100);
+ }
+
+ try (Connection connection = factory1.createConnection()) {
+ org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(queueName);
+
+ Wait.assertEquals(0, serverQueue::getMessageCount);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(queueName);
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+ TextMessage message = (TextMessage)consumer.receiveNoWait();
+ Assert.assertNull(message);
+ server.stop();
+ server_2.stop();
+ Wait.assertEquals(0, () -> getNumberOfFiles(server.getConfiguration().getLargeMessagesLocation()), 1000, 100);
+ }
+ }
+
}
\ No newline at end of file