You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/07/27 20:26:32 UTC

[activemq-artemis] 01/03: Revert "ARTEMIS-2423 Improving Consumer/Queue Delivery lock"

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 8a1f267bd567b8b5cfa7a6bc5ba1ae192c170a1d
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Sat Jul 27 10:45:08 2019 -0400

    Revert "ARTEMIS-2423 Improving Consumer/Queue Delivery lock"
    
    This reverts commit 7507a9fd4b282523c2b2f3517ed788153a35df4c.
---
 .../protocol/amqp/broker/AMQPSessionCallback.java  |  10 +-
 .../core/protocol/mqtt/MQTTPublishManager.java     |  24 +-
 .../core/protocol/mqtt/MQTTSessionCallback.java    |  14 +-
 .../core/protocol/openwire/amq/AMQSession.java     |  14 +-
 .../artemis/core/protocol/stomp/StompSession.java  |   3 +-
 .../impl/ManagementRemotingConnection.java         |  10 +-
 .../protocol/core/impl/CoreSessionCallback.java    |  10 +-
 .../activemq/artemis/core/server/Consumer.java     |  32 +--
 .../core/server/cluster/impl/BridgeImpl.java       |   2 +-
 .../core/server/cluster/impl/Redistributor.java    |   2 +-
 .../artemis/core/server/impl/QueueImpl.java        | 110 ++++----
 .../core/server/impl/ServerConsumerImpl.java       | 277 ++++++++++++---------
 .../artemis/spi/core/protocol/SessionCallback.java |  10 +-
 .../tests/integration/cli/DummyServerConsumer.java |   2 +-
 .../tests/integration/client/HangConsumerTest.java |  10 +-
 .../tests/integration/jms/client/GroupingTest.java |   1 +
 .../tests/unit/core/server/impl/QueueImplTest.java |  17 +-
 .../unit/core/server/impl/fakes/FakeConsumer.java  |   2 +-
 18 files changed, 282 insertions(+), 268 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 616da5b..f850cc1 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -186,6 +186,11 @@ public class AMQPSessionCallback implements SessionCallback {
                                                         (String) null, this, true, operationContext, manager.getPrefixes());
    }
 
+   @Override
+   public void afterDelivery() throws Exception {
+
+   }
+
    public void start() {
 
    }
@@ -600,11 +605,6 @@ public class AMQPSessionCallback implements SessionCallback {
    }
 
    @Override
-   public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
-
-   }
-
-   @Override
    public int sendLargeMessage(MessageReference ref,
                                Message message,
                                ServerConsumer consumer,
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index bb38539..abcfe3f 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -59,12 +59,6 @@ public class MQTTPublishManager {
 
    private MQTTSessionState.OutboundStore outboundStore;
 
-   /** this is the last qos that happened during delivery.
-    *  since afterDelivery will happen in the same thread, no other threads should be calling delivery and afterDelivery
-    *  so it is safe to keep this value here.
-    */
-   private Integer currentQos;
-
    public MQTTPublishManager(MQTTSession session) {
       this.session = session;
    }
@@ -114,6 +108,7 @@ public class MQTTPublishManager {
    boolean isManagementConsumer(ServerConsumer consumer) {
       return consumer == managementConsumer;
    }
+
    /**
     * Since MQTT Subscriptions can over lap; a client may receive the same message twice.  When this happens the client
     * returns a PubRec or PubAck with ID.  But we need to know which consumer to ack, since we only have the ID to go on we
@@ -124,35 +119,20 @@ public class MQTTPublishManager {
    protected void sendMessage(ICoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
       // This is to allow retries of PubRel.
       if (isManagementConsumer(consumer)) {
-         currentQos = null;
          sendPubRelMessage(message);
       } else {
          int qos = decideQoS(message, consumer);
-         currentQos = qos;
          if (qos == 0) {
             sendServerMessage((int) message.getMessageID(),  message, deliveryCount, qos);
+            session.getServerSession().individualAcknowledge(consumer.getID(), message.getMessageID());
          } else if (qos == 1 || qos == 2) {
             int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID());
             outboundStore.publish(mqttid, message.getMessageID(), consumer.getID());
             sendServerMessage(mqttid, message, deliveryCount, qos);
          } else {
-            // this will happen during afterDeliver
-         }
-      }
-   }
-
-   protected void confirmMessage(ICoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
-      if (currentQos != null) {
-         int qos = currentQos.intValue();
-         if (qos == 0) {
-            session.getServerSession().individualAcknowledge(consumer.getID(), message.getMessageID());
-         } else if (qos == 1 || qos == 2) {
-            // everything happened in delivery
-         } else {
             // Client must have disconnected and it's Subscription QoS cleared
             consumer.individualCancel(message.getMessageID(), false);
          }
-
       }
    }
 
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 168b7fa..50d5732 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -60,15 +60,6 @@ public class MQTTSessionCallback implements SessionCallback {
    }
 
    @Override
-   public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) {
-      try {
-         session.getMqttPublishManager().confirmMessage(message.toCore(), consumer, deliveryCount);
-      } catch (Exception e) {
-         log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e);
-      }
-   }
-
-   @Override
    public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
       return false;
    }
@@ -100,6 +91,11 @@ public class MQTTSessionCallback implements SessionCallback {
    }
 
    @Override
+   public void afterDelivery() throws Exception {
+
+   }
+
+   @Override
    public void browserFinished(ServerConsumer consumer) {
 
    }
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index e4ecd48..b780563 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -270,6 +270,12 @@ public class AMQSession implements SessionCallback {
 
    }
 
+   // rename actualDest to destination
+   @Override
+   public void afterDelivery() throws Exception {
+
+   }
+
    @Override
    public void browserFinished(ServerConsumer consumer) {
       AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
@@ -307,14 +313,6 @@ public class AMQSession implements SessionCallback {
    }
 
    @Override
-   public void afterDeliver(MessageReference ref,
-                            org.apache.activemq.artemis.api.core.Message message,
-                            ServerConsumer consumerID,
-                            int deliveryCount) {
-
-   }
-
-   @Override
    public int sendLargeMessage(MessageReference reference,
                                org.apache.activemq.artemis.api.core.Message message,
                                ServerConsumer consumerID,
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 03f9b44..80bbbe8 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -110,9 +110,8 @@ public class StompSession implements SessionCallback {
    public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
    }
 
-
    @Override
-   public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) throws Exception {
+   public void afterDelivery() throws Exception {
       PendingTask task;
       while ((task = afterDeliveryTasks.poll()) != null) {
          task.run();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
index 35eab8a..7e760c1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
@@ -212,6 +212,11 @@ public class ManagementRemotingConnection implements RemotingConnection {
       }
 
       @Override
+      public void afterDelivery() throws Exception {
+
+      }
+
+      @Override
       public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
          return false;
       }
@@ -232,11 +237,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
       }
 
       @Override
-      public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
-
-      }
-
-      @Override
       public int sendLargeMessage(MessageReference reference,
                                   Message message,
                                   ServerConsumer consumerID,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index fce0dd5..f53d028 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -133,11 +133,6 @@ public final class CoreSessionCallback implements SessionCallback {
    }
 
    @Override
-   public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
-      // no op
-   }
-
-   @Override
    public void sendProducerCreditsMessage(int credits, SimpleString address) {
       Packet packet = new SessionProducerCreditsMessage(credits, address);
 
@@ -150,6 +145,11 @@ public final class CoreSessionCallback implements SessionCallback {
    }
 
    @Override
+   public void afterDelivery() throws Exception {
+
+   }
+
+   @Override
    public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
       Packet packet = new SessionProducerCreditsFailMessage(credits, address);
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
index 269c74f..babddc2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
@@ -25,10 +25,6 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 
 public interface Consumer extends PriorityAware {
 
-   interface GroupHandler {
-      MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, boolean newGroup);
-   }
-
    /**
     *
     * @see SessionCallback#supportsDirectDelivery()
@@ -38,7 +34,13 @@ public interface Consumer extends PriorityAware {
    }
 
    /**
-
+    * There was a change on semantic during 2.3 here.<br>
+    * We now first accept the message, and the actual deliver is done as part of
+    * {@link #proceedDeliver(MessageReference)}. This is to avoid holding a lock on the queues while
+    * the delivery is being accomplished To avoid a lock on the queue in case of misbehaving
+    * consumers.
+    * <p>
+    * This should return busy if handle is called before proceed deliver is called
     *
     * @param reference
     * @return
@@ -46,29 +48,19 @@ public interface Consumer extends PriorityAware {
     */
    HandleStatus handle(MessageReference reference) throws Exception;
 
-   /**
-    * This will return {@link HandleStatus#BUSY} if busy, {@link HandleStatus#NO_MATCH} if no match, or the MessageReference is handled
-    * This should return busy if handle is called before proceed deliver is called
-    * @param groupHandler
-    * @param reference
-    * @return
-    * @throws Exception
-    */
-   default Object handleWithGroup(GroupHandler groupHandler, boolean newGroup, MessageReference reference) throws Exception {
-      return handle(reference);
-   }
-
    /** wakes up internal threads to deliver more messages */
    default void promptDelivery() {
    }
 
    /**
-    * This will called after delivery
-    * Giving protocols a chance to complete their deliveries doing things such as individualACK outside of main locks
+    * This will proceed with the actual delivery.
+    * Notice that handle should hold a readLock and proceedDelivery should release the readLock
+    * any lock operation on Consumer should also get a writeLock on the readWriteLock
+    * to guarantee there are no pending deliveries
     *
     * @throws Exception
     */
-   void afterDeliver(MessageReference reference) throws Exception;
+   void proceedDeliver(MessageReference reference) throws Exception;
 
    Filter getFilter();
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 4c73261..7d5bbe8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -689,7 +689,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
    // FailureListener implementation --------------------------------
 
    @Override
-   public void afterDeliver(MessageReference ref) {
+   public void proceedDeliver(MessageReference ref) {
       // no op
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
index 44a5e0b..7982018 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
@@ -193,7 +193,7 @@ public class Redistributor implements Consumer {
    }
 
    @Override
-   public void afterDeliver(MessageReference ref) {
+   public void proceedDeliver(MessageReference ref) {
       // no op
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 292d15f..998ef8c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -98,6 +98,7 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.utils.BooleanUtil;
 import org.apache.activemq.artemis.utils.Env;
 import org.apache.activemq.artemis.utils.ReferenceCounter;
+import org.apache.activemq.artemis.utils.ReusableLatch;
 import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
@@ -113,7 +114,7 @@ import org.jboss.logging.Logger;
  * <p>
  * Completely non blocking between adding to queue and delivering to consumers.
  */
-public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.GroupHandler {
+public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    protected static final int CRITICAL_PATHS = 5;
    protected static final int CRITICAL_PATH_ADD_TAIL = 0;
@@ -267,6 +268,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
 
    private final ExpiryScanner expiryScanner = new ExpiryScanner();
 
+   private final ReusableLatch deliveriesInTransit = new ReusableLatch(0);
+
    private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis());
 
    private final AtomicLong messagesAddedSnapshot = new AtomicLong(0);
@@ -952,7 +955,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
                // directDeliver flag to be re-computed resulting in direct delivery if the queue is empty
                // We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
 
-               if (getExecutor().isFlushed() &&
+               if (deliveriesInTransit.getCount() == 0 && getExecutor().isFlushed() &&
                   intermediateMessageReferences.isEmpty() && messageReferences.isEmpty() &&
                   pageIterator != null && !pageIterator.hasNext() &&
                   pageSubscription != null && !pageSubscription.isPaging()) {
@@ -971,7 +974,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
             }
          }
 
-         if (direct && supportsDirectDeliver && directDeliver && deliverDirect(ref)) {
+         if (direct && supportsDirectDeliver && directDeliver && deliveriesInTransit.getCount() == 0 && deliverDirect(ref)) {
             return;
          }
 
@@ -1002,6 +1005,23 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
       return false;
    }
 
+   /**
+    * This will wait for any pending deliveries to finish
+    */
+   private boolean flushDeliveriesInTransit() {
+      try {
+         if (deliveriesInTransit.await(DELIVERY_TIMEOUT)) {
+            return true;
+         } else {
+            ActiveMQServerLogger.LOGGER.timeoutFlushInTransit(getName().toString(), getAddress().toString());
+            return false;
+         }
+      } catch (Exception e) {
+         ActiveMQServerLogger.LOGGER.unableToFlushDeliveries(e);
+         return false;
+      }
+   }
+
    @Override
    public void forceDelivery() {
       if (pageSubscription != null && pageSubscription.isPaging()) {
@@ -2346,6 +2366,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
    @Override
    public synchronized void pause(boolean persist) {
       try {
+         this.flushDeliveriesInTransit();
          if (persist && isDurable()) {
             if (pauseStatusRecord >= 0) {
                storageManager.deleteQueueStatus(pauseStatusRecord);
@@ -2586,16 +2607,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
                   consumer = groupConsumer;
                }
 
-               Object handleValue = handle(ref, consumer, groupConsumer == null);
-
-               HandleStatus status;
-
-               if (handleValue instanceof MessageReference) {
-                  ref = (MessageReference) handleValue;
-                  status = HandleStatus.HANDLED;
-               } else {
-                  status = (HandleStatus) handleValue;
-               }
+               HandleStatus status = handle(ref, consumer);
 
                if (status == HandleStatus.HANDLED) {
 
@@ -2603,6 +2615,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
                   // this is to avoid breaks on the loop when checking for any other factors.
                   noDelivery = 0;
 
+                  if (redistributor == null) {
+                     ref = handleMessageGroup(ref, consumer, groupConsumer, groupID);
+                  }
+
+                  deliveriesInTransit.countUp();
+
+
                   removeMessageReference(holder, ref);
                   handledconsumer = consumer;
                   handled++;
@@ -2634,10 +2653,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
                // Round robin'd all
 
                if (noDelivery == this.consumers.size()) {
-                  if (logger.isDebugEnabled()) {
-                     logger.debug(this + "::All the consumers were busy, giving up now");
+                  if (handledconsumer != null) {
+                     // this shouldn't really happen,
+                     // however I'm keeping this as an assertion case future developers ever change the logic here on this class
+                     ActiveMQServerLogger.LOGGER.nonDeliveryHandled();
+                  } else {
+                     if (logger.isDebugEnabled()) {
+                        logger.debug(this + "::All the consumers were busy, giving up now");
+                     }
+                     break;
                   }
-                  break;
                }
 
                noDelivery = 0;
@@ -2645,7 +2670,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
          }
 
          if (handledconsumer != null) {
-            afterDeliver(handledconsumer, ref);
+            proceedDeliver(handledconsumer, ref);
          }
       }
 
@@ -3173,7 +3198,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
       }
    }
 
-   private boolean deliver(MessageReference ref) {
+   private boolean deliver(final MessageReference ref) {
       synchronized (this) {
          if (!supportsDirectDeliver) {
             return false;
@@ -3200,24 +3225,20 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
                consumer = groupConsumer;
             }
 
-            Object handleValue = handle(ref, consumer, groupConsumer == null);
-
-
-            HandleStatus status;
-
-            final MessageReference reference;
-            if (handleValue instanceof MessageReference) {
-               reference = (MessageReference) handleValue;
-               status = HandleStatus.HANDLED;
-            } else {
-               reference = ref;
-               status = (HandleStatus) handleValue;
-            }
-
+            HandleStatus status = handle(ref, consumer);
 
             if (status == HandleStatus.HANDLED) {
+               final MessageReference reference;
+               if (redistributor == null) {
+                  reference = handleMessageGroup(ref, consumer, groupConsumer, groupID);
+               } else {
+                  reference = ref;
+               }
+
                messagesAdded.incrementAndGet();
 
+               deliveriesInTransit.countUp();
+               proceedDeliver(consumer, reference);
                consumers.reset();
                return true;
             }
@@ -3248,16 +3269,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
       return groupConsumer;
    }
 
-   /** This is {@link Consumer.GroupHandler#handleMessageGroup(MessageReference, Consumer, boolean)} */
-   @Override
-   public MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, boolean newGroup) {
-      if (redistributor != null) {
-         // no grouping work on this case
-         return ref;
-      }
-      SimpleString groupID = extractGroupID(ref);
+   private MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, Consumer groupConsumer, SimpleString groupID) {
       if (exclusive) {
-         if (newGroup) {
+         if (groupConsumer == null) {
             exclusiveConsumer = consumer;
             if (groupFirstKey != null) {
                return new GroupFirstMessageReference(groupFirstKey, ref);
@@ -3268,7 +3282,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
          if (extractGroupSequence(ref) == -1) {
             groups.remove(groupID);
             consumers.repeat();
-         } else if (newGroup) {
+         } else if (groupConsumer == null) {
             groups.put(groupID, consumer);
             if (groupFirstKey != null) {
                return new GroupFirstMessageReference(groupFirstKey, ref);
@@ -3280,11 +3294,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
       return ref;
    }
 
-   private void afterDeliver(Consumer consumer, MessageReference reference) {
+   private void proceedDeliver(Consumer consumer, MessageReference reference) {
       try {
-         consumer.afterDeliver(reference);
+         consumer.proceedDeliver(reference);
       } catch (Throwable t) {
          errorProcessing(consumer, t, reference);
+      } finally {
+         deliveriesInTransit.countDown();
       }
    }
 
@@ -3329,10 +3345,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
       }
    }
 
-   private synchronized Object handle(final MessageReference reference, final Consumer consumer, boolean newGroup) {
-      Object status;
+   private synchronized HandleStatus handle(final MessageReference reference, final Consumer consumer) {
+      HandleStatus status;
       try {
-         status = consumer.handleWithGroup(this, newGroup, reference);
+         status = consumer.handle(reference);
       } catch (Throwable t) {
          ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference);
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index dde8d87..c709d4e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -24,8 +24,11 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -104,6 +107,13 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
    private SlowConsumerDetectionListener slowConsumerListener;
 
+   /**
+    * We get a readLock when a message is handled, and return the readLock when the message is finally delivered
+    * When stopping the consumer we need to get a writeLock to make sure we had all delivery finished
+    * otherwise a rollback may get message sneaking in
+    */
+   private final ReadWriteLock lockDelivery = new ReentrantReadWriteLock();
+
    private volatile AtomicInteger availableCredits = new AtomicInteger(0);
 
    private boolean started;
@@ -382,20 +392,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
       messageQueue.errorProcessing(this, e, deliveryObject);
    }
 
-   /** This is in case someone is using direct old API */
-   @Override
-   public HandleStatus handle(MessageReference ref) throws Exception {
-      Object refReturn = handleWithGroup(null, false, ref);
-
-      if (refReturn instanceof MessageReference) {
-         return HandleStatus.HANDLED;
-      } else {
-         return (HandleStatus) refReturn;
-      }
-
-   }
    @Override
-   public Object handleWithGroup(GroupHandler handler, boolean newGroup, final MessageReference ref) throws Exception {
+   public HandleStatus handle(final MessageReference ref) throws Exception {
       // available credits can be set back to null with a flow control option.
       AtomicInteger checkInteger = availableCredits;
       if (callback != null && !callback.hasCredits(this) || checkInteger != null && checkInteger.get() <= 0) {
@@ -483,46 +481,42 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
          }
 
-         MessageReference deliveryReference = ref;
-
-         if (handler != null) {
-            deliveryReference = handler.handleMessageGroup(ref, this, newGroup);
-         }
-
-         proceedDeliver(deliveryReference);
+         lockDelivery.readLock().lock();
 
          return HandleStatus.HANDLED;
       }
    }
 
-   private void proceedDeliver(MessageReference reference) throws Exception {
-      Message message = reference.getMessage();
+   @Override
+   public void proceedDeliver(MessageReference reference) throws Exception {
+      try {
+         Message message = reference.getMessage();
 
-      if (server.hasBrokerMessagePlugins()) {
-         server.callBrokerMessagePlugins(plugin -> plugin.beforeDeliver(this, reference));
-      }
+         if (server.hasBrokerMessagePlugins()) {
+            server.callBrokerMessagePlugins(plugin -> plugin.beforeDeliver(this, reference));
+         }
 
-      if (message.isLargeMessage() && supportLargeMessage) {
-         if (largeMessageDeliverer == null) {
-            // This can't really happen as handle had already crated the deliverer
-            // instead of throwing an exception in weird cases there is no problem on just go ahead and create it
-            // again here
-            largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage) message, reference);
+         if (message.isLargeMessage() && supportLargeMessage) {
+            if (largeMessageDeliverer == null) {
+               // This can't really happen as handle had already crated the deliverer
+               // instead of throwing an exception in weird cases there is no problem on just go ahead and create it
+               // again here
+               largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage) message, reference);
+            }
+            // The deliverer was prepared during handle, as we can't have more than one pending large message
+            // as it would return busy if there is anything pending
+            largeMessageDeliverer.deliver();
+         } else {
+            deliverStandardMessage(reference, message);
+         }
+      } finally {
+         lockDelivery.readLock().unlock();
+         callback.afterDelivery();
+         if (server.hasBrokerMessagePlugins()) {
+            server.callBrokerMessagePlugins(plugin -> plugin.afterDeliver(this, reference));
          }
-         // The deliverer was prepared during handle, as we can't have more than one pending large message
-         // as it would return busy if there is anything pending
-         largeMessageDeliverer.deliver();
-      } else {
-         deliverStandardMessage(reference, message);
       }
-   }
 
-   @Override
-   public void afterDeliver(MessageReference reference) throws Exception {
-      callback.afterDeliver(reference, reference.getMessage(), ServerConsumerImpl.this, reference.getDeliveryCount());
-      if (server.hasBrokerMessagePlugins()) {
-         server.callBrokerMessagePlugins(plugin -> plugin.afterDeliver(this, reference));
-      }
    }
 
    @Override
@@ -632,7 +626,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
     * there are no other messages to be delivered.
     */
    @Override
-   public void forceDelivery(final long sequence) {
+   public void forceDelivery(final long sequence)  {
       forceDelivery(sequence, () -> {
          Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50);
          MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue);
@@ -736,7 +730,19 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    @Override
    public void setStarted(final boolean started) {
       synchronized (lock) {
-         this.started = browseOnly || started;
+         boolean locked = lockDelivery();
+
+         // This is to make sure nothing would sneak to the client while started = false
+         // the client will stop the session and perform a rollback in certain cases.
+         // in case something sneaks to the client you could get to messaging delivering forever until
+         // you restart the server
+         try {
+            this.started = browseOnly || started;
+         } finally {
+            if (locked) {
+               lockDelivery.writeLock().unlock();
+            }
+         }
       }
 
       // Outside the lock
@@ -745,10 +751,35 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
       }
    }
 
+   private boolean lockDelivery() {
+      try {
+         if (!lockDelivery.writeLock().tryLock(30, TimeUnit.SECONDS)) {
+            ActiveMQServerLogger.LOGGER.timeoutLockingConsumer();
+            if (server != null) {
+               server.threadDump();
+            }
+            return false;
+         }
+         return true;
+      } catch (Exception e) {
+         ActiveMQServerLogger.LOGGER.failedToFinishDelivery(e);
+         return false;
+      }
+   }
+
    @Override
    public void setTransferring(final boolean transferring) {
       synchronized (lock) {
-         this.transferring = transferring;
+         // This is to make sure that the delivery process has finished any pending delivery
+         // otherwise a message may sneak in on the client while we are trying to stop the consumer
+         boolean locked = lockDelivery();
+         try {
+            this.transferring = transferring;
+         } finally {
+            if (locked) {
+               lockDelivery.writeLock().unlock();
+            }
+         }
       }
 
       // Outside the lock
@@ -1244,111 +1275,125 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
       }
 
       public boolean deliver() throws Exception {
-         if (!started) {
-            return false;
-         }
+         lockDelivery.readLock().lock();
+         try {
+            if (!started) {
+               return false;
+            }
 
-         LargeServerMessage currentLargeMessage = largeMessage;
-         if (currentLargeMessage == null) {
-            return true;
-         }
+            LargeServerMessage currentLargeMessage = largeMessage;
+            if (currentLargeMessage == null) {
+               return true;
+            }
 
-         if (availableCredits != null && availableCredits.get() <= 0) {
-            if (logger.isTraceEnabled()) {
-               logger.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" + availableCredits);
+            if (availableCredits != null && availableCredits.get() <= 0) {
+               if (logger.isTraceEnabled()) {
+                  logger.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" +
+                                  availableCredits);
+               }
+               releaseHeapBodyBuffer();
+               return false;
             }
-            releaseHeapBodyBuffer();
-            return false;
-         }
 
-         if (!sentInitialPacket) {
-            context = currentLargeMessage.getBodyEncoder();
+            if (!sentInitialPacket) {
+               context = currentLargeMessage.getBodyEncoder();
 
-            sizePendingLargeMessage = context.getLargeBodySize();
+               sizePendingLargeMessage = context.getLargeBodySize();
 
-            context.open();
+               context.open();
 
-            sentInitialPacket = true;
+               sentInitialPacket = true;
 
-            int packetSize = callback.sendLargeMessage(ref, currentLargeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount());
+               int packetSize = callback.sendLargeMessage(ref, currentLargeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount());
 
-            if (availableCredits != null) {
-               final int credits = availableCredits.addAndGet(-packetSize);
+               if (availableCredits != null) {
+                  final int credits = availableCredits.addAndGet(-packetSize);
 
-               if (credits <= 0) {
-                  releaseHeapBodyBuffer();
-               }
+                  if (credits <= 0) {
+                     releaseHeapBodyBuffer();
+                  }
 
-               if (logger.isTraceEnabled()) {
-                  logger.trace(this + "::FlowControl::" + " deliver initialpackage with " + packetSize + " delivered, available now = " + availableCredits);
+                  if (logger.isTraceEnabled()) {
+                     logger.trace(this + "::FlowControl::" +
+                                     " deliver initialpackage with " +
+                                     packetSize +
+                                     " delivered, available now = " +
+                                     availableCredits);
+                  }
                }
-            }
 
-            // Execute the rest of the large message on a different thread so as not to tie up the delivery thread
-            // for too long
+               // Execute the rest of the large message on a different thread so as not to tie up the delivery thread
+               // for too long
 
-            resumeLargeMessage();
+               resumeLargeMessage();
 
-            return false;
-         } else {
-            if (availableCredits != null && availableCredits.get() <= 0) {
-               if (logger.isTraceEnabled()) {
-                  logger.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" + availableCredits);
-               }
-               releaseHeapBodyBuffer();
                return false;
-            }
+            } else {
+               if (availableCredits != null && availableCredits.get() <= 0) {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" +
+                                     availableCredits);
+                  }
+                  releaseHeapBodyBuffer();
+                  return false;
+               }
 
-            final int localChunkLen = (int) Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
+               final int localChunkLen = (int) Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
 
-            final ByteBuffer bodyBuffer = acquireHeapBodyBuffer(localChunkLen);
+               final ByteBuffer bodyBuffer = acquireHeapBodyBuffer(localChunkLen);
 
-            assert bodyBuffer.remaining() == localChunkLen;
+               assert bodyBuffer.remaining() == localChunkLen;
 
-            final int readBytes = context.encode(bodyBuffer);
+               final int readBytes = context.encode(bodyBuffer);
 
-            assert readBytes == localChunkLen;
+               assert readBytes == localChunkLen;
 
-            final byte[] body = bodyBuffer.array();
+               final byte[] body = bodyBuffer.array();
 
-            assert body.length == readBytes;
+               assert body.length == readBytes;
 
-            //It is possible to recycle the same heap body buffer because it won't be cached by sendLargeMessageContinuation
-            //given that requiresResponse is false: ChannelImpl::send will use the resend cache only if
-            //resendCache != null && packet.isRequiresConfirmations()
+               //It is possible to recycle the same heap body buffer because it won't be cached by sendLargeMessageContinuation
+               //given that requiresResponse is false: ChannelImpl::send will use the resend cache only if
+               //resendCache != null && packet.isRequiresConfirmations()
 
-            int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this, body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false);
+               int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this, body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false);
 
-            int chunkLen = body.length;
+               int chunkLen = body.length;
 
-            if (availableCredits != null) {
-               final int credits = availableCredits.addAndGet(-packetSize);
+               if (availableCredits != null) {
+                  final int credits = availableCredits.addAndGet(-packetSize);
 
-               if (credits <= 0) {
-                  releaseHeapBodyBuffer();
-               }
+                  if (credits <= 0) {
+                     releaseHeapBodyBuffer();
+                  }
 
-               if (logger.isTraceEnabled()) {
-                  logger.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" + packetSize + " available now=" + availableCredits);
+                  if (logger.isTraceEnabled()) {
+                     logger.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" +
+                                     packetSize +
+                                     " available now=" +
+                                     availableCredits);
+                  }
                }
-            }
 
-            positionPendingLargeMessage += chunkLen;
+               positionPendingLargeMessage += chunkLen;
 
-            if (positionPendingLargeMessage < sizePendingLargeMessage) {
-               resumeLargeMessage();
+               if (positionPendingLargeMessage < sizePendingLargeMessage) {
+                  resumeLargeMessage();
 
-               return false;
+                  return false;
+               }
             }
-         }
 
-         if (logger.isTraceEnabled()) {
-            logger.trace("Finished deliverLargeMessage");
-         }
+            if (logger.isTraceEnabled()) {
+               logger.trace("Finished deliverLargeMessage");
+            }
 
-         finish();
+            finish();
 
-         return true;
+            return true;
+         } finally {
+            lockDelivery.readLock().unlock();
+         }
       }
 
       public void finish() throws Exception {
@@ -1408,7 +1453,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
                }
 
                if (status == HandleStatus.HANDLED) {
-                  afterDeliver(current);
+                  proceedDeliver(current);
                }
 
                current = null;
@@ -1436,7 +1481,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
                }
 
                if (status == HandleStatus.HANDLED) {
-                  afterDeliver(ref);
+                  proceedDeliver(ref);
                } else if (status == HandleStatus.BUSY) {
                   // keep a reference on the current message reference
                   // to handle it next time the browser deliverer is executed
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index 18ef253..5577522 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -41,10 +41,11 @@ public interface SessionCallback {
     */
    boolean hasCredits(ServerConsumer consumerID);
 
-   // Certain protocols (MQTT) will need to confirm messages doing things such as individualACKS
-   // and these need to be done outside of the main lock.
-   // otherwise we could dead-lock during delivery
-   void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) throws Exception;
+   /**
+    * This can be used to complete certain operations outside of the lock,
+    * like acks or other operations.
+    */
+   void afterDelivery() throws Exception;
 
    /**
     * Use this to updates specifics on the message after a redelivery happened.
@@ -68,7 +69,6 @@ public interface SessionCallback {
    //       Future developments may change this, but beware why I have chosen to keep the parameter separated here
    int sendMessage(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount);
 
-
    int sendLargeMessage(MessageReference reference,
                         Message message,
                         ServerConsumer consumerID,
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
index 9bc0ea2..58bf2d3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
@@ -190,7 +190,7 @@ public class DummyServerConsumer implements ServerConsumer {
    }
 
    @Override
-   public void afterDeliver(MessageReference reference) throws Exception {
+   public void proceedDeliver(MessageReference reference) throws Exception {
 
    }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 0006752..3e64ac5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -528,6 +528,11 @@ public class HangConsumerTest extends ActiveMQTestBase {
       }
 
       @Override
+      public void afterDelivery() throws Exception {
+
+      }
+
+      @Override
       public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
          targetCallback.sendProducerCreditsFailMessage(credits, address);
       }
@@ -553,11 +558,6 @@ public class HangConsumerTest extends ActiveMQTestBase {
          }
       }
 
-      @Override
-      public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
-
-      }
-
       /* (non-Javadoc)
        * @see SessionCallback#sendLargeMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, long, int)
        */
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java
index fc32c44..ba8cd95 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java
@@ -271,6 +271,7 @@ public class GroupingTest extends JMSTestBase {
 
          assertEquals(tm.getStringProperty("JMSXGroupID"), jmsxgroupID);
       }
+      Thread.sleep(2000);
       //session.rollback();
       //session.close();
       //consume all msgs from 2nd first consumer
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
index 076e164..f20cce3 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
@@ -1312,21 +1312,12 @@ public class QueueImplTest extends ActiveMQTestBase {
 
          @Override
          public synchronized HandleStatus handle(MessageReference reference) {
-            return HandleStatus.HANDLED;
-         }
-
-         @Override
-         public Object handleWithGroup(GroupHandler groupHandler, boolean newGroup, MessageReference reference) throws Exception {
             if (count == 0) {
                //the first message is handled and will be used to determine this consumer
                //to be the group consumer
                count++;
                firstMessageHandled.countDown();
-               if (groupHandler != null) {
-                  return groupHandler.handleMessageGroup(reference, this, newGroup);
-               } else {
-                  return HandleStatus.HANDLED;
-               }
+               return HandleStatus.HANDLED;
             } else if (count <= 2) {
                //the next two attempts to send the second message will be done
                //attempting a direct delivery and an async one after that
@@ -1338,11 +1329,7 @@ public class QueueImplTest extends ActiveMQTestBase {
                //the second message should have stop the delivery loop:
                //it will succeed just to let the message being handled and
                //reduce the message count to 0
-               if (groupHandler != null) {
-                  return groupHandler.handleMessageGroup(reference, this, newGroup);
-               } else {
-                  return HandleStatus.HANDLED;
-               }
+               return HandleStatus.HANDLED;
             }
          }
       };
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
index 47b042b..2a5a330 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
@@ -138,7 +138,7 @@ public class FakeConsumer implements Consumer {
    }
 
    @Override
-   public void afterDeliver(MessageReference ref) throws Exception {
+   public void proceedDeliver(MessageReference ref) throws Exception {
       // no op
    }