You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/06/30 21:37:33 UTC

[activemq-artemis] branch main updated (2f361b1d40 -> da38fcce71)

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


    from 2f361b1d40 ARTEMIS-3878: Update to Derby test dep to 10.14.2.0
     new d90179b99c ARTEMIS-3815 proper retry through IOCompletion when message not found on target queue on Mirror
     new da38fcce71 NO-JIRA reformat AMQPMirrorControllerTarget

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../activemq/artemis/core/io/RunnableCallback.java |  64 ++++++
 .../protocol/amqp/broker/AMQPSessionCallback.java  |   4 +
 .../connect/mirror/AMQPMirrorControllerTarget.java | 148 +++++++------
 .../amqp/connect/AMQPMirrorFastACKTest.java        | 239 +++++++++++++++++++++
 4 files changed, 379 insertions(+), 76 deletions(-)
 create mode 100644 artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/RunnableCallback.java
 create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorFastACKTest.java


[activemq-artemis] 02/02: NO-JIRA reformat AMQPMirrorControllerTarget

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit da38fcce712cbe3fef4815c9a763cc04deb5f5d5
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Thu Jun 30 14:52:52 2022 -0400

    NO-JIRA reformat AMQPMirrorControllerTarget
---
 .../connect/mirror/AMQPMirrorControllerTarget.java | 93 +++++++++-------------
 1 file changed, 37 insertions(+), 56 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
index 2c652e352b..9bb5cf527c 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
@@ -77,26 +77,29 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
 
    private static final Logger logger = Logger.getLogger(AMQPMirrorControllerTarget.class);
 
-   private static ThreadLocal<MirrorController> controllerThreadLocal = new ThreadLocal<>();
+   private static final ThreadLocal<MirrorController> CONTROLLER_THREAD_LOCAL = new ThreadLocal<>();
 
    public static void setControllerInUse(MirrorController controller) {
-      controllerThreadLocal.set(controller);
+      CONTROLLER_THREAD_LOCAL.set(controller);
    }
 
    public static MirrorController getControllerInUse() {
-      return controllerThreadLocal.get();
+      return CONTROLLER_THREAD_LOCAL.get();
    }
 
-   /** Objects of this class can be used by either transaction or by OperationContext.
-    *  It is important that when you're using the transactions you clear any references to
-    *  the operation context. Don't use transaction and OperationContext at the same time
-    *  as that would generate duplicates on the objects cache.
+   /**
+    * Objects of this class can be used by either transaction or by OperationContext.
+    * It is important that when you're using the transactions you clear any references to
+    * the operation context. Don't use transaction and OperationContext at the same time
+    * as that would generate duplicates on the objects cache.
     */
    class ACKMessageOperation implements IOCallback, Runnable {
 
       Delivery delivery;
 
-      /** notice that when you use the Transaction, you need to make sure you don't use the IO*/
+      /**
+       * notice that when you use the Transaction, you need to make sure you don't use the IO
+       */
       public TransactionOperationAbstract tx = new TransactionOperationAbstract() {
          @Override
          public void afterCommit(Transaction tx) {
@@ -135,7 +138,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
 
       @Override
       public void onError(int errorCode, String errorMessage) {
-         logger.warn(errorMessage + "-"  + errorMessage);
+         logger.warn(errorCode + "-" + errorMessage);
       }
    }
 
@@ -181,10 +184,8 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
    @Override
    protected void actualDelivery(AMQPMessage message, Delivery delivery, Receiver receiver, Transaction tx) {
       recoverContext();
-
       incrementSettle();
 
-
       if (logger.isTraceEnabled()) {
          logger.trace(server + "::actualdelivery call for " + message);
       }
@@ -204,29 +205,20 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
          if (eventType != null) {
             if (eventType.equals(ADD_ADDRESS)) {
                AddressInfo addressInfo = parseAddress(message);
-               if (logger.isDebugEnabled()) {
-                  logger.debug(server + " Adding Address " + addressInfo);
-               }
+
                addAddress(addressInfo);
             } else if (eventType.equals(DELETE_ADDRESS)) {
                AddressInfo addressInfo = parseAddress(message);
-               if (logger.isDebugEnabled()) {
-                  logger.debug(server + " Removing Address " + addressInfo);
-               }
+
                deleteAddress(addressInfo);
             } else if (eventType.equals(CREATE_QUEUE)) {
                QueueConfiguration queueConfiguration = parseQueue(message);
-               if (logger.isDebugEnabled()) {
-                  logger.debug(server + " Creating queue " + queueConfiguration);
-               }
+
                createQueue(queueConfiguration);
             } else if (eventType.equals(DELETE_QUEUE)) {
-
                String address = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, ADDRESS);
                String queueName = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, QUEUE);
-               if (logger.isDebugEnabled()) {
-                  logger.debug(server + " Deleting queue " + queueName + " on address " + address);
-               }
+
                deleteQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName));
             } else if (eventType.equals(POST_ACK)) {
                String nodeID = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, BROKER_ID);
@@ -239,17 +231,11 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
                String queueName = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, QUEUE);
                AmqpValue value = (AmqpValue) message.getBody();
                Long messageID = (Long) value.getValue();
-               if (logger.isDebugEnabled()) {
-                  logger.debug(server + " Post ack queueName = " + queueName + " messageID=" + messageID + ", nodeID=" + nodeID);
-               }
                if (postAcknowledge(queueName, nodeID, messageID, messageAckOperation, ackReason)) {
                   messageAckOperation = null;
                }
             }
          } else {
-            if (logger.isDebugEnabled()) {
-               logger.debug(server + " Sending message " + message);
-            }
             if (sendMessage(message, messageAckOperation)) {
                // since the send was successful, we give up the reference here,
                // so there won't be any call on afterCompleteOperations
@@ -269,7 +255,6 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
    @Override
    public void initialize() throws Exception {
       super.initialize();
-      org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
 
       // Match the settlement mode of the remote instead of relying on the default of MIXED.
       receiver.setSenderSettleMode(receiver.getRemoteSenderSettleMode());
@@ -279,24 +264,22 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
       flow();
    }
 
-   private QueueConfiguration parseQueue(AMQPMessage message) throws Exception {
-      AmqpValue bodyvalue = (AmqpValue) message.getBody();
-      String body = (String) bodyvalue.getValue();
-      QueueConfiguration queueConfiguration = QueueConfiguration.fromJSON(body);
-      return queueConfiguration;
+   private QueueConfiguration parseQueue(AMQPMessage message) {
+      AmqpValue bodyValue = (AmqpValue) message.getBody();
+      String body = (String) bodyValue.getValue();
+      return QueueConfiguration.fromJSON(body);
    }
 
-   private AddressInfo parseAddress(AMQPMessage message) throws Exception {
-      AmqpValue bodyvalue = (AmqpValue) message.getBody();
-      String body = (String) bodyvalue.getValue();
-      AddressInfo addressInfo = AddressInfo.fromJSON(body);
-      return addressInfo;
+   private AddressInfo parseAddress(AMQPMessage message) {
+      AmqpValue bodyValue = (AmqpValue) message.getBody();
+      String body = (String) bodyValue.getValue();
+      return AddressInfo.fromJSON(body);
    }
 
    @Override
    public void addAddress(AddressInfo addressInfo) throws Exception {
       if (logger.isDebugEnabled()) {
-         logger.debug(server + " Adding address " + addressInfo);
+         logger.debug(server + " adding address " + addressInfo);
       }
       server.addAddressInfo(addressInfo);
    }
@@ -319,12 +302,12 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
    @Override
    public void createQueue(QueueConfiguration queueConfiguration) throws Exception {
       if (logger.isDebugEnabled()) {
-         logger.debug(server + " Adding queue " + queueConfiguration);
+         logger.debug(server + " adding queue " + queueConfiguration);
       }
       try {
          server.createQueue(queueConfiguration, true);
-      } catch (Exception ignored) {
-         logger.debug("Queue could not be created, already existed " + queueConfiguration, ignored);
+      } catch (Exception e) {
+         logger.debug("Queue could not be created, already existed " + queueConfiguration, e);
       }
    }
 
@@ -334,13 +317,17 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
          logger.debug(server + " destroy queue " + queueName + " on address = " + addressName + " server " + server.getIdentity());
       }
       try {
-         server.destroyQueue(queueName,null, false, true, false, false);
+         server.destroyQueue(queueName, null, false, true, false, false);
       } catch (ActiveMQNonExistentQueueException expected) {
          logger.debug(server + " queue " + queueName + " was previously removed", expected);
       }
    }
 
-   public boolean postAcknowledge(String queue, String nodeID, long messageID, ACKMessageOperation ackMessage, AckReason reason) throws Exception {
+   public boolean postAcknowledge(String queue,
+                                  String nodeID,
+                                  long messageID,
+                                  ACKMessageOperation ackMessage,
+                                  AckReason reason) throws Exception {
       final Queue targetQueue = server.locateQueue(queue);
 
       if (targetQueue == null) {
@@ -447,13 +434,10 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
          internalID = internalIDLong;
       }
 
-
       if (logger.isTraceEnabled()) {
-         logger.trace("sendMessage on server " + server + " for message " + message +
-                         " with internalID = " + internalIDLong + " mirror id " + internalMirrorID);
+         logger.trace("sendMessage on server " + server + " for message " + message + " with internalID = " + internalIDLong + " mirror id " + internalMirrorID);
       }
 
-
       routingContext.setDuplicateDetection(false); // we do our own duplicate detection here
 
       DuplicateIDCache duplicateIDCache;
@@ -499,16 +483,14 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
       return true;
    }
 
-   /**
-    * @param ref
-    * @param reason
-    */
    @Override
    public void postAcknowledge(MessageReference ref, AckReason reason) {
+      // Do nothing
    }
 
    @Override
    public void sendMessage(Message message, RoutingContext context, List<MessageReference> refs) {
+      // Do nothing
    }
 
    class PageAck implements ToIntFunction<PagedReference>, BooleanSupplier, Runnable {
@@ -527,7 +509,6 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
 
       /**
        * Method to retry the ack before a scan
-       * @return
        */
       @Override
       public boolean getAsBoolean() {


[activemq-artemis] 01/02: ARTEMIS-3815 proper retry through IOCompletion when message not found on target queue on Mirror

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit d90179b99cc217a2611a7d439ddcc9a27d79dd12
Author: iliya <il...@gmail.com>
AuthorDate: Thu Jun 30 14:40:46 2022 -0400

    ARTEMIS-3815 proper retry through IOCompletion when message not found on target queue on Mirror
    
    co-authored Clebert Suconic
---
 .../activemq/artemis/core/io/RunnableCallback.java |  64 ++++++
 .../protocol/amqp/broker/AMQPSessionCallback.java  |   4 +
 .../connect/mirror/AMQPMirrorControllerTarget.java |  59 +++--
 .../amqp/connect/AMQPMirrorFastACKTest.java        | 239 +++++++++++++++++++++
 4 files changed, 344 insertions(+), 22 deletions(-)

diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/RunnableCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/RunnableCallback.java
new file mode 100644
index 0000000000..cb4344a626
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/RunnableCallback.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.io;
+
+import org.jboss.logging.Logger;
+
+public class RunnableCallback implements IOCallback {
+   private static final Logger logger = Logger.getLogger(RunnableCallback.class);
+
+   Runnable okCallback;
+   Runnable errorCallback;
+
+   public RunnableCallback(Runnable ok, Runnable error) {
+      if (ok == null) {
+         throw new NullPointerException("ok = null");
+      }
+      if (ok == null) {
+         throw new NullPointerException("error = null");
+      }
+      okCallback = ok;
+      errorCallback = error;
+   }
+
+   public RunnableCallback(Runnable ok) {
+      if (ok == null) {
+         throw new NullPointerException("ok = null");
+      }
+      okCallback = ok;
+      errorCallback = ok;
+   }
+
+   @Override
+   public void done() {
+      try {
+         okCallback.run();
+      } catch (Throwable e) {
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   @Override
+   public void onError(int errorCode, String errorMessage) {
+      try {
+         errorCallback.run();
+      } catch (Throwable e) {
+         logger.warn(e.getMessage(), e);
+      }
+   }
+}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index c78883afff..adb61c107d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -177,6 +177,10 @@ public class AMQPSessionCallback implements SessionCallback {
       }
    }
 
+   public OperationContext getSessionContext() {
+      return serverSession.getSessionContext();
+   }
+
    @Override
    public void browserFinished(ServerConsumer consumer) {
 
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
index 4be5968123..2c652e352b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
@@ -26,7 +26,9 @@ import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.RunnableCallback;
 import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
 import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -138,7 +140,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
    }
 
    // in a regular case we should not have more than amqpCredits on the pool, that's the max we would need
-   private final MpscPool<ACKMessageOperation> ackMessageMpscPool = new MpscPool<>(amqpCredits, ACKMessageOperation::reset, () -> new ACKMessageOperation());
+   private final MpscPool<ACKMessageOperation> ackMessageMpscPool = new MpscPool<>(amqpCredits, ACKMessageOperation::reset, ACKMessageOperation::new);
 
    final RoutingContextImpl routingContext = new RoutingContextImpl(null);
 
@@ -151,6 +153,8 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
 
    private final ReferenceNodeStore referenceNodeStore;
 
+   OperationContext mirrorContext;
+
    public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
                                      AMQPConnectionContext connection,
                                      AMQPSessionContext protonSession,
@@ -161,6 +165,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
       this.basicController.setLink(receiver);
       this.server = server;
       this.referenceNodeStore = sessionSPI.getProtocolManager().getReferenceIDSupplier();
+      mirrorContext = protonSession.getSessionSPI().getSessionContext();
    }
 
    @Override
@@ -224,7 +229,6 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
                }
                deleteQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName));
             } else if (eventType.equals(POST_ACK)) {
-               String address = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, ADDRESS);
                String nodeID = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, BROKER_ID);
 
                AckReason ackReason = AMQPMessageBrokerAccessor.getMessageAnnotationAckReason(message);
@@ -236,9 +240,9 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
                AmqpValue value = (AmqpValue) message.getBody();
                Long messageID = (Long) value.getValue();
                if (logger.isDebugEnabled()) {
-                  logger.debug(server + " Post ack address=" + address + " queueName = " + queueName + " messageID=" + messageID + ", nodeID=" + nodeID);
+                  logger.debug(server + " Post ack queueName = " + queueName + " messageID=" + messageID + ", nodeID=" + nodeID);
                }
-               if (postAcknowledge(address, queueName, nodeID, messageID, messageAckOperation, ackReason)) {
+               if (postAcknowledge(queueName, nodeID, messageID, messageAckOperation, ackReason)) {
                   messageAckOperation = null;
                }
             }
@@ -336,7 +340,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
       }
    }
 
-   public boolean postAcknowledge(String address, String queue, String nodeID, long messageID, ACKMessageOperation ackMessage, AckReason reason) throws Exception {
+   public boolean postAcknowledge(String queue, String nodeID, long messageID, ACKMessageOperation ackMessage, AckReason reason) throws Exception {
       final Queue targetQueue = server.locateQueue(queue);
 
       if (targetQueue == null) {
@@ -355,32 +359,50 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
          logger.trace("Server " + server.getIdentity() + " with queue = " + queue + " being acked for " + messageID + " coming from " + messageID + " targetQueue = " + targetQueue);
       }
 
-      performAck(nodeID, messageID, targetQueue, ackMessage, reason, true);
+      performAck(nodeID, messageID, targetQueue, ackMessage, reason, (short)0);
       return true;
    }
 
-
    public void performAckOnPage(String nodeID, long messageID, Queue targetQueue, IOCallback ackMessageOperation) {
       PageAck pageAck = new PageAck(targetQueue, nodeID, messageID, ackMessageOperation);
       targetQueue.getPageSubscription().scanAck(pageAck, pageAck, pageAck, pageAck);
    }
 
-   private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, AckReason reason, boolean retry) {
+   private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, AckReason reason, final short retry) {
       if (logger.isTraceEnabled()) {
          logger.trace("performAck (nodeID=" + nodeID + ", messageID=" + messageID + ")" + ", targetQueue=" + targetQueue.getName());
       }
       MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceNodeStore);
 
-      if (reference == null && retry) {
+      if (reference == null) {
          if (logger.isDebugEnabled()) {
-            logger.debug("Retrying Reference not found on messageID=" + messageID + " nodeID=" + nodeID);
+            logger.debug("Retrying Reference not found on messageID=" + messageID + " nodeID=" + nodeID + ", currentRetry=" + retry);
+         }
+         switch (retry) {
+            case 0:
+               // first retry, after IO Operations
+               sessionSPI.getSessionContext().executeOnCompletion(new RunnableCallback(() -> performAck(nodeID, messageID, targetQueue, ackMessageOperation, reason, (short) 1)));
+               return;
+            case 1:
+               // second retry after the queue is flushed the temporary adds
+               targetQueue.flushOnIntermediate(() -> {
+                  recoverContext();
+                  performAck(nodeID, messageID, targetQueue, ackMessageOperation, reason, (short)2);
+               });
+               return;
+            case 2:
+               // third retry, on paging
+               if (reason != AckReason.EXPIRED) {
+                  // if expired, we don't need to check on paging
+                  // as the message will expire again when depaged (if on paging)
+                  performAckOnPage(nodeID, messageID, targetQueue, ackMessageOperation);
+                  return;
+               } else {
+                  ackMessageOperation.run();
+               }
          }
-         targetQueue.flushOnIntermediate(() -> {
-            recoverContext();
-            performAck(nodeID, messageID, targetQueue, ackMessageOperation, reason, false);
-         });
-         return;
       }
+
       if (reference != null) {
          if (logger.isTraceEnabled()) {
             logger.trace("Post ack Server " + server + " worked well for messageID=" + messageID + " nodeID=" + nodeID);
@@ -398,14 +420,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
          } catch (Exception e) {
             logger.warn(e.getMessage(), e);
          }
-      } else {
-         if (reason != AckReason.EXPIRED) {
-            // if expired, we don't need to check on paging
-            // as the message will expire again when depaged (if on paging)
-            performAckOnPage(nodeID, messageID, targetQueue, ackMessageOperation);
-         }
       }
-
    }
 
    /**
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorFastACKTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorFastACKTest.java
new file mode 100644
index 0000000000..3fda73e495
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorFastACKTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQPMirrorFastACKTest extends AmqpClientTestSupport {
+
+   private static final String SLOW_SERVER_NAME = "slow";
+   private static final int SLOW_SERVER_PORT = AMQP_PORT + 1;
+
+   private static final int ENCODE_DELAY = 10;
+
+   private ActiveMQServer slowServer;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      slowServer = createSlowServer();
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      try {
+         if (slowServer != null) {
+            slowServer.stop();
+         }
+      } finally {
+         super.tearDown();
+      }
+   }
+
+   @Test
+   public void testMirrorTargetFastACK() throws Exception {
+      final int NUMBER_OF_MESSAGES = 10;
+      CountDownLatch done = new CountDownLatch(NUMBER_OF_MESSAGES);
+
+      AMQPMirrorBrokerConnectionElement replication = configureMirrorTowardsSlow(server);
+
+      slowServer.start();
+      server.start();
+
+      waitForServerToStart(slowServer);
+      waitForServerToStart(server);
+
+      server.addAddressInfo(new AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+      server.createQueue(new QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
+         MessageProducer producer = session.createProducer(session.createQueue(getQueueName()));
+
+         connection.start();
+
+         consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+               try {
+                  message.acknowledge();
+                  done.countDown();
+               } catch (Exception ignore) {
+                  // Ignore
+               }
+            }
+         });
+
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            producer.send(session.createTextMessage("i=" + i));
+         }
+
+         Assert.assertTrue(done.await(5000, TimeUnit.MILLISECONDS));
+      }
+
+      Queue snf = server.locateQueue(replication.getMirrorSNF());
+      Queue queue = slowServer.locateQueue(getQueueName());
+
+      Wait.waitFor(() -> snf.getMessageCount() == 0 && snf.getMessagesAdded() > NUMBER_OF_MESSAGES);
+      Wait.assertTrue("Expected mirrored target queue " + getQueueName() + " to be empty", () -> queue.getMessageCount() == 0 && queue.getMessagesAdded() == NUMBER_OF_MESSAGES);
+   }
+
+   @Override
+   protected ActiveMQServer createServer() throws Exception {
+      return createServer(AMQP_PORT, false);
+   }
+
+   private AMQPMirrorBrokerConnectionElement configureMirrorTowardsSlow(ActiveMQServer source) {
+      AMQPBrokerConnectConfiguration connection = new AMQPBrokerConnectConfiguration("mirror", "tcp://localhost:" + SLOW_SERVER_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+      AMQPMirrorBrokerConnectionElement replication = new AMQPMirrorBrokerConnectionElement().setDurable(true);
+      connection.addElement(replication);
+
+      source.getConfiguration().addAMQPConnection(connection);
+      return replication;
+   }
+
+   private ActiveMQServer createSlowServer() throws Exception {
+      ActiveMQSecurityManager securityManager = new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new SecurityConfiguration());
+      ActiveMQServer server = new ActiveMQServerImpl(createBasicConfig(SLOW_SERVER_PORT), mBeanServer, securityManager) {
+         @Override
+         protected StorageManager createStorageManager() {
+            return AMQPMirrorFastACKTest.this.createStorageManager(getConfiguration(), getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener);
+         }
+      };
+
+      server.getConfiguration().setName(SLOW_SERVER_NAME);
+      server.getConfiguration().getAcceptorConfigurations().clear();
+      server.getConfiguration().getAcceptorConfigurations().add(addAcceptorConfiguration(slowServer, SLOW_SERVER_PORT));
+
+      server.getConfiguration().setJMXManagementEnabled(true);
+      server.getConfiguration().setMessageExpiryScanPeriod(100);
+
+      configureAddressPolicy(server);
+      configureBrokerSecurity(server);
+
+      return server;
+   }
+
+   private StorageManager createStorageManager(Configuration configuration,
+                                               CriticalAnalyzer criticalAnalyzer,
+                                               ExecutorFactory executorFactory,
+                                               ScheduledExecutorService scheduledPool,
+                                               ExecutorFactory ioExecutorFactory,
+                                               IOCriticalErrorListener ioCriticalErrorListener) {
+      return new JournalStorageManager(configuration, criticalAnalyzer, executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener) {
+         @Override
+         protected Journal createMessageJournal(Configuration config,
+                                                IOCriticalErrorListener criticalErrorListener,
+                                                int fileSize) {
+            return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener, config.getJournalMaxAtticFiles()) {
+               @Override
+               public void appendAddRecordTransactional(long txID,
+                                                        long id,
+                                                        byte recordType,
+                                                        Persister persister,
+                                                        Object record) throws Exception {
+                  super.appendAddRecordTransactional(txID, id, recordType, record instanceof AMQPStandardMessage ? new SlowMessagePersister<>(persister) : persister, record);
+               }
+            };
+         }
+      };
+   }
+
+   static class SlowMessagePersister<T> implements Persister<T> {
+
+      private final Persister<T> delegate;
+
+      SlowMessagePersister(Persister<T> delegate) {
+         this.delegate = delegate;
+      }
+
+      @Override
+      public byte getID() {
+         return delegate.getID();
+      }
+
+      @Override
+      public int getEncodeSize(T record) {
+         return delegate.getEncodeSize(record);
+      }
+
+      @Override
+      public void encode(ActiveMQBuffer buffer, T record) {
+         try {
+            // This will slow down IO completion for transactional message write
+            Thread.sleep(ENCODE_DELAY);
+         } catch (Exception ignore) {
+            // ignore
+         }
+         delegate.encode(buffer, record);
+      }
+
+      @Override
+      public T decode(ActiveMQBuffer buffer, T record, CoreMessageObjectPools pool) {
+         return delegate.decode(buffer, record, pool);
+      }
+   }
+}