You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2024/01/16 00:13:59 UTC

(activemq-artemis) 04/04: ARTEMIS-4558 Idempotent Mirrored ACKs

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit bc7e4639e0a4665c106f336fb837c33d2cbcd361
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Jan 5 16:48:11 2024 -0500

    ARTEMIS-4558 Idempotent Mirrored ACKs
    
    Mirror acks should be performed atomically with the storage of the source ACK. Both the send of the ack and the recording of the ack should be part of the same transaction (in case of transactional).
    
    We are also adding support on transactions for an afterWired callback for the proper plug of OperationContext sync.
---
 .../connect/mirror/AMQPMirrorControllerSource.java |  85 +++---
 .../connect/mirror/AMQPMirrorControllerTarget.java |  12 +-
 .../integration/amqp/connect/AMQPReplicaTest.java  |  64 +++++
 .../amqp/connect/AMQPSyncMirrorTest.java           | 132 ++++++++-
 .../amqp/connect/MirrorControllerBasicTest.java    |   2 +-
 .../brokerConnection/mirror/IdempotentACKTest.java | 313 +++++++++++++++++++++
 .../brokerConnection/sender/SenderSoakTest.java    | 178 ++++++++++++
 7 files changed, 737 insertions(+), 49 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index 144ec59a39..1d4d56df08 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.core.server.RoutingContext.MirrorOption;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
@@ -65,6 +66,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
    public static final Symbol ADDRESS = Symbol.getSymbol("x-opt-amq-mr-adr");
    public static final Symbol QUEUE = Symbol.getSymbol("x-opt-amq-mr-qu");
    public static final Symbol BROKER_ID = Symbol.getSymbol("x-opt-amq-bkr-id");
+   public static final SimpleString BROKER_ID_SIMPLE_STRING = SimpleString.toSimpleString(BROKER_ID.toString());
 
    // Events:
    public static final Symbol ADD_ADDRESS = Symbol.getSymbol("addAddress");
@@ -154,7 +156,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
 
       if (addQueues) {
          Message message = createMessage(addressInfo.getName(), null, ADD_ADDRESS, null, addressInfo.toJSON());
-         route(server, message);
+         routeMirrorCommand(server, message);
       }
    }
 
@@ -170,7 +172,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
       }
       if (deleteQueues) {
          Message message = createMessage(addressInfo.getName(), null, DELETE_ADDRESS, null, addressInfo.toJSON());
-         route(server, message);
+         routeMirrorCommand(server, message);
       }
    }
 
@@ -193,7 +195,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
       }
       if (addQueues) {
          Message message = createMessage(queueConfiguration.getAddress(), queueConfiguration.getName(), CREATE_QUEUE, null, queueConfiguration.toJSON());
-         route(server, message);
+         routeMirrorCommand(server, message);
       }
    }
 
@@ -213,15 +215,34 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
 
       if (deleteQueues) {
          Message message = createMessage(address, queue, DELETE_QUEUE, null, queue.toString());
-         route(server, message);
+         routeMirrorCommand(server, message);
       }
    }
 
+   private boolean invalidTarget(MirrorController controller, Message message) {
+      if (controller == null) {
+         return false;
+      }
+      String remoteID = getRemoteMirrorId();
+      if (remoteID == null) {
+         // This is to avoid a reflection (Miror sendin messages back to itself) from a small period of time one node reconnects but not the opposite direction.
+         Object localRemoteID = message.getAnnotation(BROKER_ID_SIMPLE_STRING);
+         if (localRemoteID != null) {
+            remoteID = String.valueOf(localRemoteID);
+            logger.debug("Remote link is not initialized yet, setting remoteID from message as {}", remoteID);
+         }
+      }
+      return sameNode(remoteID, controller.getRemoteMirrorId());
+   }
+
    private boolean invalidTarget(MirrorController controller) {
       return controller != null && sameNode(getRemoteMirrorId(), controller.getRemoteMirrorId());
    }
 
    private boolean ignoreAddress(SimpleString address) {
+      if (address.startsWith(server.getConfiguration().getManagementAddress())) {
+         return true;
+      }
       return !addressFilter.match(address);
    }
 
@@ -238,7 +259,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
          return;
       }
 
-      if (invalidTarget(context.getMirrorSource())) {
+      if (invalidTarget(context.getMirrorSource(), message)) {
          logger.trace("sendMessage::server {} is discarding send to avoid infinite loop (reflection with the mirror)", server);
          return;
       }
@@ -444,13 +465,14 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
          MirrorACKOperation operation = getAckOperation(tx);
          // notice the operationContext.replicationLineUp is done on beforeCommit as part of the TX
          operation.addMessage(messageCommand, ref);
+         routeMirrorCommand(server, messageCommand, tx);
       } else {
          server.getStorageManager().afterStoreOperations(new IOCallback() {
             @Override
             public void done() {
                try {
                   logger.debug("preAcknowledge::afterStoreOperation for messageReference {}", ref);
-                  route(server, messageCommand);
+                  routeMirrorCommand(server, messageCommand);
                } catch (Exception e) {
                   logger.warn(e.getMessage(), e);
                }
@@ -469,7 +491,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
          logger.trace("getAckOperation::setting operation on transaction {}", tx);
          ackOperation = new MirrorACKOperation(server);
          tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION, ackOperation);
-         tx.afterStore(ackOperation);
+         tx.afterWired(ackOperation);
       }
 
       return ackOperation;
@@ -490,7 +512,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
       return sendOperation;
    }
 
-   private static class MirrorACKOperation extends TransactionOperationAbstract {
+   private static class MirrorACKOperation implements Runnable {
 
       final ActiveMQServer server;
 
@@ -511,47 +533,19 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
       }
 
       @Override
-      public void beforeCommit(Transaction tx) {
-         logger.debug("MirrorACKOperation::beforeCommit processing {}", acks);
-         acks.forEach(this::doBeforeCommit);
+      public void run() {
+         logger.debug("MirrorACKOperation::wired processing {}", acks);
+         acks.forEach(this::doWired);
       }
 
       // callback to be used on forEach
-      private void doBeforeCommit(Message ack, MessageReference ref) {
+      private void doWired(Message ack, MessageReference ref) {
          OperationContext context = (OperationContext) ack.getUserContext(OperationContext.class);
          if (context != null) {
             context.replicationLineUp();
          }
       }
 
-      @Override
-      public void afterCommit(Transaction tx) {
-         logger.debug("MirrorACKOperation::afterCommit processing {}", acks);
-         acks.forEach(this::doAfterCommit);
-      }
-
-      // callback to be used on forEach
-      private void doAfterCommit(Message ack, MessageReference ref) {
-         try {
-            route(server, ack);
-         } catch (Exception e) {
-            logger.warn(e.getMessage(), e);
-         }
-         ref.getMessage().usageDown();
-      }
-
-      @Override
-      public void afterRollback(Transaction tx) {
-         acks.forEach(this::doAfterRollback);
-      }
-
-      // callback to be used on forEach
-      private void doAfterRollback(Message ack, MessageReference ref) {
-         OperationContext context = (OperationContext) ack.getUserContext(OperationContext.class);
-         if (context != null) {
-            context.replicationDone();
-         }
-      }
 
    }
 
@@ -609,10 +603,17 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
       return AMQPMirrorMessageFactory.createMessage(snfQueue.getAddress().toString(), address, queue, event, brokerID, body, ackReason);
    }
 
-   public static void route(ActiveMQServer server, Message message) throws Exception {
+   public static void routeMirrorCommand(ActiveMQServer server, Message message) throws Exception {
+      routeMirrorCommand(server, message, null);
+   }
+
+   public static void routeMirrorCommand(ActiveMQServer server, Message message, Transaction tx) throws Exception {
       message.setMessageID(server.getStorageManager().generateID());
       RoutingContext ctx = mirrorControlRouting.get();
-      ctx.clear().setMirrorOption(MirrorOption.disabled);
+      // it is important to use local only at the source to avoid having the message strictly load balancing
+      // to other nodes if the SNF queue has the same name as the one on this node.
+      ctx.clear().setMirrorOption(MirrorOption.disabled).setLoadBalancingType(MessageLoadBalancingType.LOCAL_ONLY).setTransaction(tx);
+      logger.debug("SetTX {}", tx);
       server.getPostOffice().route(message, ctx, false);
    }
 
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
index e979768c97..a211357305 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
@@ -448,7 +448,9 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
                   targetQueue.expire(reference, null, false);
                   break;
                default:
-                  targetQueue.acknowledge(null, reference, reason, null, false);
+                  TransactionImpl transaction = new TransactionImpl(server.getStorageManager()).setAsync(true);
+                  targetQueue.acknowledge(transaction, reference, reason, null, false);
+                  transaction.commit();
                   break;
             }
             OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation);
@@ -470,7 +472,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
 
       String internalMirrorID = (String) deliveryAnnotations.getValue().get(BROKER_ID);
       if (internalMirrorID == null) {
-         internalMirrorID = getRemoteMirrorId(); // not pasisng the ID means the data was generated on the remote broker
+         internalMirrorID = getRemoteMirrorId(); // not passing the ID means the data was generated on the remote broker
       }
       Long internalIDLong = (Long) deliveryAnnotations.getValue().get(INTERNAL_ID);
       String internalAddress = (String) deliveryAnnotations.getValue().get(INTERNAL_DESTINATION);
@@ -516,7 +518,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
          message.setAddress(internalAddress);
       }
 
-      final TransactionImpl transaction = new MirrorTransaction(server.getStorageManager());
+      final TransactionImpl transaction = new MirrorTransaction(server.getStorageManager()).setAsync(true);
       transaction.addOperation(messageCompletionAck.tx);
       routingContext.setTransaction(transaction);
       duplicateIDCache.addToCache(duplicateIDBytes, transaction);
@@ -588,7 +590,9 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
             if (reference == null) {
                return false;
             } else {
-               targetQueue.acknowledge(null, reference, AckReason.NORMAL, null, false);
+               TransactionImpl tx = new TransactionImpl(server.getStorageManager()).setAsync(true);
+               targetQueue.acknowledge(tx, reference, AckReason.NORMAL, null, false);
+               tx.commit();
                OperationContextImpl.getContext().executeOnCompletion(operation);
                return true;
             }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
index 8e798af4d1..ba0412d8ae 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
@@ -1306,5 +1306,69 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
    }
 
 
+   @Test
+   public void testSimpleReplicaTX() throws Exception {
+
+      String brokerConnectionName = "brokerConnectionName:" + UUIDGenerator.getInstance().generateStringUUID();
+      server.setIdentity("targetServer");
+      server.start();
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_2.setIdentity("server_2");
+      server_2.getConfiguration().setName("thisone");
+
+      AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(brokerConnectionName, "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+      AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true).setDurable(true);
+      replica.setName("theReplica");
+      amqpConnection.addElement(replica);
+      server_2.getConfiguration().addAMQPConnection(amqpConnection);
+      server_2.getConfiguration().setName("server_2");
+
+      int NUMBER_OF_MESSAGES = 10;
+
+      server_2.start();
+      Wait.assertTrue(server_2::isStarted);
+
+      // We create the address to avoid auto delete on the queue
+      server_2.addAddressInfo(new AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+      server_2.createQueue(new QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2);
+      Connection connection = factory.createConnection();
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      MessageProducer producer = session.createProducer(session.createQueue(getQueueName()));
+
+      for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+         Message message = session.createTextMessage(getText(true, i));
+         message.setIntProperty("i", i);
+         producer.send(message);
+      }
+      session.commit();
+
+      Queue queueOnServer1 = locateQueue(server, getQueueName());
+      Queue snfreplica = server_2.locateQueue(replica.getMirrorSNF());
+      Assert.assertNotNull(snfreplica);
+
+      Wait.assertEquals(0, snfreplica::getMessageCount);
+
+      Wait.assertEquals(NUMBER_OF_MESSAGES, queueOnServer1::getMessageCount, 2000);
+      Queue queueOnServer2 = locateQueue(server_2, getQueueName());
+      Wait.assertEquals(NUMBER_OF_MESSAGES, queueOnServer1::getMessageCount);
+      Wait.assertEquals(NUMBER_OF_MESSAGES, queueOnServer2::getMessageCount);
+
+      MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
+      connection.start();
+
+      for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+         Message m = consumer.receive(1000);
+         Assert.assertNotNull(m);
+      }
+      session.commit();
+
+      Wait.assertEquals(0, snfreplica::getMessageCount);
+      Wait.assertEquals(0, queueOnServer1::getMessageCount);
+      Wait.assertEquals(0, queueOnServer2::getMessageCount);
+   }
+
+
 
 }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.java
index 9895d134d1..de3128c6bf 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
 import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
@@ -385,6 +386,7 @@ public class AMQPSyncMirrorTest extends AmqpClientTestSupport {
    }
 
    private interface StorageCallback {
+
       void storage(boolean isUpdate,
                    boolean isCommit,
                    long txID,
@@ -424,7 +426,7 @@ public class AMQPSyncMirrorTest extends AmqpClientTestSupport {
                                            Object record,
                                            boolean sync,
                                            IOCompletion callback) throws Exception {
-                  storageCallback.storage(false, false,  -1, id, recordType, persister, record);
+                  storageCallback.storage(false, false, -1, id, recordType, persister, record);
                   super.appendAddRecord(id, recordType, persister, record, sync, callback);
                }
 
@@ -451,7 +453,7 @@ public class AMQPSyncMirrorTest extends AmqpClientTestSupport {
                                               boolean sync,
                                               IOCompletion callback,
                                               boolean lineUpContext) throws Exception {
-                  storageCallback.storage(false, true, txID, txID, (byte)0, null, null);
+                  storageCallback.storage(false, true, txID, txID, (byte) 0, null, null);
                   super.appendCommitRecord(txID, sync, callback, lineUpContext);
                }
 
@@ -471,4 +473,130 @@ public class AMQPSyncMirrorTest extends AmqpClientTestSupport {
          }
       };
    }
+
+
+   @Test
+   public void testSimpleACK_TX_AMQP() throws Exception {
+      testSimpleAckSync("AMQP", true, false, 1024);
+   }
+
+   @Test
+   public void testSimpleACK_TX_CORE() throws Exception {
+      testSimpleAckSync("CORE", true, false, 1024);
+   }
+
+   @Test
+   public void testSimpleACK_NoTX_AMQP() throws Exception {
+      testSimpleAckSync("AMQP", false, false, 1024);
+   }
+
+   @Test
+   public void testSimpleACK_NoTX_CORE() throws Exception {
+      testSimpleAckSync("CORE", false, false, 1024);
+   }
+
+   @Test
+   public void testSimpleACK_NoTX_CORE_Large() throws Exception {
+      testSimpleAckSync("CORE", false, false, 255 * 1024);
+   }
+
+   @Test
+   public void testSimpleACK_TX_CORE_Large() throws Exception {
+      testSimpleAckSync("CORE", true, false, 255 * 1024);
+   }
+
+   @Test
+   public void testSimple_Core_Individual_Large() throws Exception {
+      testSimpleAckSync("CORE", false, true, 255 * 1024);
+   }
+
+   @Test
+   public void testSimple_Core_Individual() throws Exception {
+      testSimpleAckSync("CORE", false, true, 1024);
+   }
+
+   public void testSimpleAckSync(final String protocol, final boolean tx, final boolean individualAck, int messageSize) throws Exception {
+      AtomicInteger errors = new AtomicInteger(0);
+
+      final int NUMBER_OF_MESSAGES = 10;
+
+      slowServer = createServerWithCallbackStorage(SLOW_SERVER_PORT, SLOW_SERVER_NAME, (isUpdate, isTX, txId, id, recordType, persister, record) -> {
+      });
+      slowServer.setIdentity("slowServer");
+      server.setIdentity("server");
+
+      ExecutorService pool = Executors.newFixedThreadPool(5);
+      runAfter(pool::shutdown);
+
+      configureMirrorTowardsSlow(server);
+
+      slowServer.getConfiguration().setName("slow");
+      server.getConfiguration().setName("fast");
+      slowServer.start();
+      server.start();
+
+      waitForServerToStart(slowServer);
+      waitForServerToStart(server);
+
+      server.addAddressInfo(new AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+      server.createQueue(new QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
+
+      Wait.waitFor(() -> slowServer.locateQueue(getQueueName()) != null);
+      Queue replicatedQueue = slowServer.locateQueue(getQueueName());
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + AMQP_PORT);
+
+      if (factory instanceof ActiveMQConnectionFactory) {
+         ((ActiveMQConnectionFactory) factory).getServerLocator().setBlockOnAcknowledge(true);
+      }
+
+      Connection connection = factory.createConnection();
+      runAfter(connection::close);
+      Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+      MessageProducer producer = session.createProducer(session.createQueue(getQueueName()));
+
+      connection.start();
+
+      producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+      final String bodyMessage;
+      {
+         StringBuffer buffer = new StringBuffer();
+         for (int i = 0; i < messageSize; i++) {
+            buffer.append("large Buffer...");
+         }
+         bodyMessage = buffer.toString();
+      }
+
+      for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+         int theI = i;
+         TextMessage message = session.createTextMessage(bodyMessage);
+         message.setStringProperty("strProperty", "" + theI);
+         producer.send(message);
+         Wait.assertEquals(i + 1, replicatedQueue::getMessageCount, 5000);
+      }
+
+      Wait.assertEquals(NUMBER_OF_MESSAGES, replicatedQueue::getMessageCount);
+
+      connection.start();
+
+      Session clientSession = connection.createSession(tx, tx ? Session.SESSION_TRANSACTED : (individualAck ? ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE : Session.CLIENT_ACKNOWLEDGE));
+      MessageConsumer consumer = clientSession.createConsumer(clientSession.createQueue(getQueueName()));
+
+      for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+         Message message = consumer.receive(5000);
+         Assert.assertNotNull(message);
+
+         message.acknowledge();
+
+         if (tx) {
+            clientSession.commit();
+         }
+
+         Wait.assertEquals(NUMBER_OF_MESSAGES - i - 1, replicatedQueue::getMessageCount, 5000);
+      }
+
+      Assert.assertEquals(0, errors.get());
+   }
+
 }
\ No newline at end of file
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/MirrorControllerBasicTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/MirrorControllerBasicTest.java
index cd72dca36a..671e307f43 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/MirrorControllerBasicTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/MirrorControllerBasicTest.java
@@ -94,7 +94,7 @@ public class MirrorControllerBasicTest extends ActiveMQTestBase {
       server.createQueue(new QueueConfiguration("test").setAddress("test").setRoutingType(RoutingType.ANYCAST));
 
       Message message = AMQPMirrorMessageFactory.createMessage("test", SimpleString.toSimpleString("ad1"), SimpleString.toSimpleString("qu1"), "test", "someUID", "body-test", AckReason.KILLED);
-      AMQPMirrorControllerSource.route(server, message);
+      AMQPMirrorControllerSource.routeMirrorCommand(server, message);
 
       AmqpClient client = new AmqpClient(new URI("tcp://localhost:61616"), null, null);
       AmqpConnection connection = client.connect();
diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/IdempotentACKTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/IdempotentACKTest.java
new file mode 100644
index 0000000000..f3cffea7a7
--- /dev/null
+++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/IdempotentACKTest.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.brokerConnection.mirror;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.TransactionRolledBackException;
+import java.io.File;
+import java.io.StringWriter;
+import java.lang.invoke.MethodHandles;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import org.apache.activemq.artemis.tests.soak.SoakTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IdempotentACKTest extends SoakTestBase {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static String largeBody;
+
+   static {
+      StringWriter writer = new StringWriter();
+      while (writer.getBuffer().length() < 1024 * 1024) {
+         writer.append("This is a large string ..... ");
+      }
+      largeBody = writer.toString();
+   }
+
+   private static final String QUEUE_NAME = "myQueue";
+
+   public static final String DC1_NODE_A = "idempotentMirror/DC1";
+   public static final String DC2_NODE_A = "idempotentMirror/DC2";
+
+   Process processDC1_node_A;
+   Process processDC2_node_A;
+
+   private static String DC1_NODEA_URI = "tcp://localhost:61616";
+   private static String DC2_NODEA_URI = "tcp://localhost:61618";
+
+   private static void createServer(String serverName,
+                                    String connectionName,
+                                    String mirrorURI,
+                                    int porOffset) throws Exception {
+      File serverLocation = getFileServerLocation(serverName);
+      deleteDirectory(serverLocation);
+
+      HelperCreate cliCreateServer = new HelperCreate();
+      cliCreateServer.setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(serverLocation);
+      cliCreateServer.setMessageLoadBalancing("ON_DEMAND");
+      cliCreateServer.setClustered(false);
+      cliCreateServer.setNoWeb(true);
+      cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE_A);
+      cliCreateServer.addArgs("--queues", QUEUE_NAME);
+      cliCreateServer.setPortOffset(porOffset);
+      cliCreateServer.createServer();
+
+      Properties brokerProperties = new Properties();
+      brokerProperties.put("AMQPConnections." + connectionName + ".uri", mirrorURI);
+      brokerProperties.put("AMQPConnections." + connectionName + ".retryInterval", "1000");
+      brokerProperties.put("AMQPConnections." + connectionName + ".type", AMQPBrokerConnectionAddressType.MIRROR.toString());
+      brokerProperties.put("AMQPConnections." + connectionName + ".connectionElements.mirror.sync", "false");
+      brokerProperties.put("largeMessageSync", "false");
+      File brokerPropertiesFile = new File(serverLocation, "broker.properties");
+      saveProperties(brokerProperties, brokerPropertiesFile);
+   }
+
+   @BeforeClass
+   public static void createServers() throws Exception {
+      createServer(DC1_NODE_A, "mirror", DC2_NODEA_URI, 0);
+      createServer(DC2_NODE_A, "mirror", DC1_NODEA_URI, 2);
+   }
+
+   private void startServers() throws Exception {
+      processDC1_node_A = startServer(DC1_NODE_A, -1, -1, new File(getServerLocation(DC1_NODE_A), "broker.properties"));
+      processDC2_node_A = startServer(DC2_NODE_A, -1, -1, new File(getServerLocation(DC2_NODE_A), "broker.properties"));
+
+      ServerUtil.waitForServerToStart(0, 10_000);
+      ServerUtil.waitForServerToStart(2, 10_000);
+   }
+
+   @Before
+   public void cleanupServers() {
+      cleanupData(DC1_NODE_A);
+      cleanupData(DC2_NODE_A);
+   }
+
+   private void transactSend(Session session, MessageProducer producer, int initialCounter, int numberOfMessages, int largeMessageFactor) throws Throwable {
+      try {
+         for (int i = initialCounter; i < initialCounter + numberOfMessages; i++) {
+            TextMessage message;
+            String unique = "Unique " + i;
+            if (i % largeMessageFactor == 0) {
+               message = session.createTextMessage(largeBody);
+               message.setBooleanProperty("large", true);
+            } else {
+               message = session.createTextMessage("this is small");
+               message.setBooleanProperty("large", false);
+            }
+            message.setIntProperty("i", i);
+            message.setStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), unique);
+            producer.send(message);
+         }
+         session.commit();
+      } catch (JMSException e) {
+         if (e instanceof TransactionRolledBackException && e.getMessage().contains("Duplicate message detected")) {
+            logger.debug("OK Exception {}", e.getMessage(), e);
+            return; // ok
+         } else {
+            logger.warn("Not OK Exception {}", e.getMessage(), e);
+            throw e;
+         }
+      }
+   }
+
+
+   @Test
+   public void testAMQP() throws Exception {
+      testACKs("AMQP");
+   }
+
+   @Test
+   public void testCORE() throws Exception {
+      testACKs("CORE");
+   }
+
+   private void testACKs(final String protocol) throws Exception {
+      startServers();
+
+      final int consumers = 10;
+      final int numberOfMessages = 1000;
+      final int largeMessageFactor = 30;
+      final int messagesPerConsumer = 30;
+
+      // Just a reminder: if you change number on this test, this needs to be true:
+      Assert.assertEquals("Invalid test config", 0, numberOfMessages % consumers);
+
+      AtomicBoolean running = new AtomicBoolean(true);
+      runAfter(() -> running.set(false));
+
+      String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";
+
+      ExecutorService executor = Executors.newFixedThreadPool(consumers);
+      runAfter(executor::shutdownNow);
+
+      final ConnectionFactory connectionFactoryDC1A;
+      connectionFactoryDC1A = CFUtil.createConnectionFactory(protocol, DC1_NODEA_URI);
+      CountDownLatch sendDone = new CountDownLatch(1);
+      CountDownLatch killSend = new CountDownLatch(1);
+
+      executor.execute(() -> {
+         int messagesSent = 0;
+         while (running.get() && messagesSent < numberOfMessages) {
+            try (Connection connection = connectionFactoryDC1A.createConnection()) {
+               Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+               Queue queue = session.createQueue(QUEUE_NAME);
+               MessageProducer producer = session.createProducer(queue);
+               if (messagesSent < 100) {
+                  transactSend(session, producer, messagesSent, 1, 1);
+                  messagesSent++;
+                  logger.debug("Sent {}", messagesSent);
+                  if (messagesSent == 100) {
+                     logger.debug("Signal to kill");
+                     killSend.countDown();
+                  }
+               } else {
+                  transactSend(session, producer, messagesSent, 100, largeMessageFactor);
+                  messagesSent += 100;
+                  logger.debug("Sent {}", messagesSent);
+               }
+            } catch (Throwable e) {
+               logger.warn(e.getMessage(), e);
+               try {
+                  Thread.sleep(100);
+               } catch (Throwable ignored) {
+               }
+            }
+         }
+         sendDone.countDown();
+      });
+
+      Assert.assertTrue(killSend.await(50, TimeUnit.SECONDS));
+
+      restartDC1_ServerA();
+
+      Assert.assertTrue(sendDone.await(50, TimeUnit.SECONDS));
+
+      SimpleManagement simpleManagementDC1A = new SimpleManagement(DC1_NODEA_URI, null, null);
+      SimpleManagement simpleManagementDC2A = new SimpleManagement(DC2_NODEA_URI, null, null);
+
+      Wait.assertEquals(0, () -> getCount(simpleManagementDC1A, snfQueue));
+      Wait.assertEquals(numberOfMessages, () -> getCount(simpleManagementDC1A, QUEUE_NAME));
+      Wait.assertEquals(numberOfMessages, () -> getCount(simpleManagementDC2A, QUEUE_NAME));
+
+      CountDownLatch latchKill = new CountDownLatch(consumers);
+
+      CountDownLatch latchDone = new CountDownLatch(consumers);
+
+      Runnable runnableConsumer = () -> {
+         int messagesConsumed = 0;
+         while (running.get() && messagesConsumed < messagesPerConsumer) {
+            try (Connection connection = connectionFactoryDC1A.createConnection()) {
+               Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+               Queue queue = session.createQueue(QUEUE_NAME);
+               MessageConsumer consumer = session.createConsumer(queue);
+               connection.start();
+               while (messagesConsumed < messagesPerConsumer) {
+                  Message message = consumer.receive(100);
+                  if (message instanceof TextMessage) {
+                     logger.debug("message received={}", message);
+                     session.commit();
+                     messagesConsumed++;
+                     logger.debug("Received {}", messagesConsumed);
+                     if (messagesConsumed == 10) {
+                        latchKill.countDown();
+                     }
+                  } else {
+                     logger.info("no messages...");
+                  }
+               }
+            } catch (Throwable e) {
+               logger.warn(e.getMessage(), e);
+               try {
+                  Thread.sleep(100);
+               } catch (Throwable ignored) {
+               }
+            }
+         }
+         latchDone.countDown();
+      };
+
+      for (int i = 0; i < consumers; i++) {
+         executor.execute(runnableConsumer);
+      }
+
+      Assert.assertTrue(latchKill.await(10, TimeUnit.SECONDS));
+
+      restartDC1_ServerA();
+
+      Assert.assertTrue(latchDone.await(4, TimeUnit.MINUTES));
+
+      long flushedMessages = 0;
+
+      try (Connection connection = connectionFactoryDC1A.createConnection()) {
+         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+         Queue queue = session.createQueue(QUEUE_NAME);
+         MessageConsumer consumer = session.createConsumer(queue);
+         connection.start();
+         while (consumer.receive(500) != null) {
+            flushedMessages++;
+         }
+         session.commit();
+      }
+
+      logger.debug("Flushed {}", flushedMessages);
+
+      // after all flushed messages, we should have 0 messages on both nodes
+
+      Wait.assertEquals(0, () -> getCount(simpleManagementDC1A, snfQueue));
+      Wait.assertEquals(0, () -> getCount(simpleManagementDC2A, QUEUE_NAME));
+   }
+
+   private void restartDC1_ServerA() throws Exception {
+      processDC1_node_A.destroyForcibly();
+      Assert.assertTrue(processDC1_node_A.waitFor(10, TimeUnit.SECONDS));
+      processDC1_node_A = startServer(DC1_NODE_A, -1, -1, new File(getServerLocation(DC1_NODE_A), "broker.properties"));
+      ServerUtil.waitForServerToStart(0, 10_000);
+   }
+
+   public long getCount(SimpleManagement simpleManagement, String queue) throws Exception {
+      long value = simpleManagement.getMessageCountOnQueue(queue);
+      logger.debug("count on queue {} is {}", queue, value);
+      return value;
+   }
+}
diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/sender/SenderSoakTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/sender/SenderSoakTest.java
new file mode 100644
index 0000000000..ae0a4364dc
--- /dev/null
+++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/sender/SenderSoakTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.brokerConnection.sender;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.io.StringWriter;
+import java.lang.invoke.MethodHandles;
+import java.util.Properties;
+
+import org.apache.activemq.artemis.tests.soak.SoakTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SenderSoakTest extends SoakTestBase {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static String largeBody;
+   private static String smallBody = "This is a small body";
+
+   {
+      StringWriter writer = new StringWriter();
+      while (writer.getBuffer().length() < 1024 * 1024) {
+         writer.append("This is a large string ..... ");
+      }
+      largeBody = writer.toString();
+   }
+
+   public static final String DC1_NODE_A = "sender/DC1/A";
+   public static final String DC2_NODE_A = "sender/DC2/A";
+
+   Process processDC1_node_A;
+   Process processDC2_node_A;
+
+   private static String DC1_NODEA_URI = "tcp://localhost:61616";
+   private static String DC2_NODEA_URI = "tcp://localhost:61618";
+
+   private static void createServer(String serverName, int porOffset) throws Exception {
+      File serverLocation = getFileServerLocation(serverName);
+      deleteDirectory(serverLocation);
+
+      HelperCreate cliCreateServer = new HelperCreate();
+      cliCreateServer.setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(serverLocation);
+      cliCreateServer.setMessageLoadBalancing("STRICT");
+      cliCreateServer.setClustered(false);
+      cliCreateServer.setNoWeb(true);
+      cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE_A);
+      cliCreateServer.addArgs("--addresses", "order");
+      cliCreateServer.addArgs("--queues", "myQueue");
+      cliCreateServer.setPortOffset(porOffset);
+      cliCreateServer.createServer();
+   }
+
+   public static void createServers(boolean useMirror) throws Exception {
+      createServer(DC1_NODE_A, 0);
+
+      if (useMirror) {
+         Properties brokerProperties = new Properties();
+         brokerProperties.put("AMQPConnections.sender.uri", "tcp://localhost:61618");
+         brokerProperties.put("AMQPConnections.sender.retryInterval", "100");
+         brokerProperties.put("AMQPConnections.sender.connectionElements.sender.type", "MIRROR");
+         brokerProperties.put("largeMessageSync", "false");
+         File brokerPropertiesFile = new File(getServerLocation(DC1_NODE_A), "broker.properties");
+         saveProperties(brokerProperties, brokerPropertiesFile);
+      } else {
+         Properties brokerProperties = new Properties();
+         brokerProperties.put("AMQPConnections.sender.uri", "tcp://localhost:61618");
+         brokerProperties.put("AMQPConnections.sender.retryInterval", "100");
+         brokerProperties.put("AMQPConnections.sender.connectionElements.sender.type", "SENDER");
+         brokerProperties.put("AMQPConnections.sender.connectionElements.sender.queueName", "myQueue");
+         brokerProperties.put("largeMessageSync", "false");
+         File brokerPropertiesFile = new File(getServerLocation(DC1_NODE_A), "broker.properties");
+         saveProperties(brokerProperties, brokerPropertiesFile);
+      }
+
+      createServer(DC2_NODE_A, 2);
+   }
+
+   private void startServers() throws Exception {
+      processDC2_node_A = startServer(DC2_NODE_A, -1, -1);
+      processDC1_node_A = startServer(DC1_NODE_A, -1, -1, new File(getServerLocation(DC1_NODE_A), "broker.properties"));
+
+      ServerUtil.waitForServerToStart(0, 10_000);
+      ServerUtil.waitForServerToStart(2, 10_000);
+   }
+
+   @Test
+   public void testMirror() throws Exception {
+      testSender(true);
+   }
+
+   @Test
+   public void testSender() throws Exception {
+      testSender(false);
+   }
+
+   public void testSender(boolean mirror) throws Exception {
+      createServers(mirror);
+      startServers();
+
+      final int numberOfMessages = 1000;
+
+      Assert.assertTrue("numberOfMessages must be even", numberOfMessages % 2 == 0);
+
+      ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616");
+      ConnectionFactory connectionFactoryDC2A = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61618");
+
+      try (Connection connection = connectionFactoryDC1A.createConnection()) {
+         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+         Queue queue = session.createQueue("myQueue");
+         MessageProducer producer = session.createProducer(queue);
+
+         for (int i = 0; i < numberOfMessages; i++) {
+            TextMessage message;
+            boolean large;
+            if (i % 1 == 10) {
+               message = session.createTextMessage(largeBody);
+               large = true;
+            } else {
+               message = session.createTextMessage(smallBody);
+               large = false;
+            }
+            message.setIntProperty("i", i);
+            message.setBooleanProperty("large", large);
+            producer.send(message);
+            if (i % 100 == 0) {
+               logger.debug("commit {}", i);
+               session.commit();
+            }
+         }
+         session.commit();
+      }
+
+      logger.debug("All messages were sent");
+
+      try (Connection connection = connectionFactoryDC2A.createConnection()) {
+         connection.start();
+         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+         Queue queue = session.createQueue("myQueue");
+         MessageConsumer consumer = session.createConsumer(queue);
+
+         for (int i = 0; i < numberOfMessages; i++) {
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            logger.debug("Received message {}, large={}", message.getIntProperty("i"), message.getBooleanProperty("large"));
+         }
+         session.commit();
+      }
+   }
+
+}