You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/10/26 17:10:12 UTC
[activemq-artemis] branch main updated: ARTEMIS-4045 Fixing in Handler ACKRunner on Mirror
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 87ec9b5465 ARTEMIS-4045 Fixing in Handler ACKRunner on Mirror
87ec9b5465 is described below
commit 87ec9b54654d3a05d2677907fa643c2b8d0289c7
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Wed Oct 26 13:00:57 2022 -0400
ARTEMIS-4045 Fixing in Handler ACKRunner on Mirror
---
.../connect/mirror/AMQPMirrorControllerTarget.java | 7 ++-
.../integration/amqp/connect/AMQPReplicaTest.java | 51 ++++++++++++++++++++++
2 files changed, 57 insertions(+), 1 deletion(-)
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
index 4ca9c9dcd6..74edd0a42e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
@@ -120,6 +120,11 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
@Override
public void run() {
+ if (!connection.isHandler()) {
+ logger.info("Moving execution to proton handler");
+ connectionRun();
+ return;
+ }
logger.trace("Delivery settling for {}, context={}", delivery, delivery.getContext());
delivery.disposition(Accepted.getInstance());
settle(delivery);
@@ -383,7 +388,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
performAckOnPage(nodeID, messageID, targetQueue, ackMessageOperation);
return;
} else {
- ackMessageOperation.run();
+ connection.runNow(ackMessageOperation);
}
}
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
index bd5ab073d5..d2d2e0e022 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
@@ -39,8 +39,11 @@ import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirror
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
+import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorMessageFactory;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
@@ -117,6 +120,54 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
}
+ @Test
+ public void testNotFoundRetries() throws Exception {
+ server.setIdentity("Server1");
+
+ server.start();
+
+ server_2 = createServer(AMQP_PORT_2, false);
+
+ AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT);
+ amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
+ server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+ server_2.start();
+
+ server_2.addAddressInfo(new AddressInfo("sometest").setAutoCreated(false));
+ server_2.createQueue(new QueueConfiguration("sometest").setDurable(true));
+
+
+ Wait.assertTrue(() -> server_2.locateQueue("sometest") != null);
+
+
+ Wait.waitFor(() -> server_2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_test") != null);
+ Queue mirrorQueue = server_2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_test");
+ Assert.assertNotNull(mirrorQueue);
+
+
+ AssertionLoggerHandler.startCapture();
+ runAfter(() -> AssertionLoggerHandler.stopCapture());
+
+ // Adding some PostAck event that will never be found on the target for an expiry
+ org.apache.activemq.artemis.api.core.Message message = AMQPMirrorMessageFactory.createMessage(mirrorQueue.getAddress().toString(), SimpleString.toSimpleString("sometest"), SimpleString.toSimpleString("sometest"), AMQPMirrorControllerSource.POST_ACK, "0000", 3333L, AckReason.EXPIRED).setDurable(true);
+ message.setMessageID(server_2.getStorageManager().generateID());
+ server_2.getPostOffice().route(message, false);
+
+ // Adding some PostAck event that will never be found on the target for a regular ack
+ message = AMQPMirrorMessageFactory.createMessage(mirrorQueue.getAddress().toString(), SimpleString.toSimpleString("sometest"), SimpleString.toSimpleString("sometest"), AMQPMirrorControllerSource.POST_ACK, "0000", 3334L, AckReason.NORMAL).setDurable(true);
+ message.setMessageID(server_2.getStorageManager().generateID());
+ server_2.getPostOffice().route(message, false);
+
+ Wait.assertEquals(0L, mirrorQueue::getMessageCount, 2000, 100);
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224041"));
+ AssertionLoggerHandler.stopCapture();
+
+ server_2.stop();
+ server.stop();
+ }
+
+
@Test
public void testDeleteQueueWithRemoveFalse() throws Exception {
server.setIdentity("Server1");