You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/10/26 17:10:12 UTC

[activemq-artemis] branch main updated: ARTEMIS-4045 Fixing in Handler ACKRunner on Mirror

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 87ec9b5465 ARTEMIS-4045 Fixing in Handler ACKRunner on Mirror
87ec9b5465 is described below

commit 87ec9b54654d3a05d2677907fa643c2b8d0289c7
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Wed Oct 26 13:00:57 2022 -0400

    ARTEMIS-4045 Fixing in Handler ACKRunner on Mirror
---
 .../connect/mirror/AMQPMirrorControllerTarget.java |  7 ++-
 .../integration/amqp/connect/AMQPReplicaTest.java  | 51 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 1 deletion(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
index 4ca9c9dcd6..74edd0a42e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
@@ -120,6 +120,11 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
 
       @Override
       public void run() {
+         if (!connection.isHandler()) {
+            logger.info("Moving execution to proton handler");
+            connectionRun();
+            return;
+         }
          logger.trace("Delivery settling for {}, context={}", delivery, delivery.getContext());
          delivery.disposition(Accepted.getInstance());
          settle(delivery);
@@ -383,7 +388,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
                   performAckOnPage(nodeID, messageID, targetQueue, ackMessageOperation);
                   return;
                } else {
-                  ackMessageOperation.run();
+                  connection.runNow(ackMessageOperation);
                }
          }
       }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
index bd5ab073d5..d2d2e0e022 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
@@ -39,8 +39,11 @@ import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirror
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
+import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorMessageFactory;
 import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
 import org.apache.activemq.artemis.tests.util.CFUtil;
 import org.apache.activemq.artemis.tests.util.RandomUtil;
@@ -117,6 +120,54 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
    }
 
 
+   @Test
+   public void testNotFoundRetries() throws Exception {
+      server.setIdentity("Server1");
+
+      server.start();
+
+      server_2 = createServer(AMQP_PORT_2, false);
+
+      AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT);
+      amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
+      server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+      server_2.start();
+
+      server_2.addAddressInfo(new AddressInfo("sometest").setAutoCreated(false));
+      server_2.createQueue(new QueueConfiguration("sometest").setDurable(true));
+
+
+      Wait.assertTrue(() -> server_2.locateQueue("sometest") != null);
+
+
+      Wait.waitFor(() -> server_2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_test") != null);
+      Queue mirrorQueue = server_2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_test");
+      Assert.assertNotNull(mirrorQueue);
+
+
+      AssertionLoggerHandler.startCapture();
+      runAfter(() -> AssertionLoggerHandler.stopCapture());
+
+      // Adding some PostAck event that will never be found on the target for an expiry
+      org.apache.activemq.artemis.api.core.Message message = AMQPMirrorMessageFactory.createMessage(mirrorQueue.getAddress().toString(), SimpleString.toSimpleString("sometest"), SimpleString.toSimpleString("sometest"), AMQPMirrorControllerSource.POST_ACK, "0000", 3333L, AckReason.EXPIRED).setDurable(true);
+      message.setMessageID(server_2.getStorageManager().generateID());
+      server_2.getPostOffice().route(message, false);
+
+      // Adding some PostAck event that will never be found on the target for a regular ack
+      message = AMQPMirrorMessageFactory.createMessage(mirrorQueue.getAddress().toString(), SimpleString.toSimpleString("sometest"), SimpleString.toSimpleString("sometest"), AMQPMirrorControllerSource.POST_ACK, "0000", 3334L, AckReason.NORMAL).setDurable(true);
+      message.setMessageID(server_2.getStorageManager().generateID());
+      server_2.getPostOffice().route(message, false);
+
+      Wait.assertEquals(0L, mirrorQueue::getMessageCount, 2000, 100);
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224041"));
+      AssertionLoggerHandler.stopCapture();
+
+      server_2.stop();
+      server.stop();
+   }
+
+
    @Test
    public void testDeleteQueueWithRemoveFalse() throws Exception {
       server.setIdentity("Server1");