You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/07/27 03:56:33 UTC

[activemq-artemis] 02/03: ARTEMIS-2423 Improving Consumer/Queue Delivery lock

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 7507a9fd4b282523c2b2f3517ed788153a35df4c
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Thu Jul 25 12:10:24 2019 -0400

    ARTEMIS-2423 Improving Consumer/Queue Delivery lock
---
 .../protocol/amqp/broker/AMQPSessionCallback.java  |  10 +-
 .../core/protocol/mqtt/MQTTPublishManager.java     |  24 +-
 .../core/protocol/mqtt/MQTTSessionCallback.java    |  14 +-
 .../core/protocol/openwire/amq/AMQSession.java     |  14 +-
 .../artemis/core/protocol/stomp/StompSession.java  |   3 +-
 .../impl/ManagementRemotingConnection.java         |  10 +-
 .../protocol/core/impl/CoreSessionCallback.java    |  10 +-
 .../activemq/artemis/core/server/Consumer.java     |  32 ++-
 .../core/server/cluster/impl/BridgeImpl.java       |   2 +-
 .../core/server/cluster/impl/Redistributor.java    |   2 +-
 .../artemis/core/server/impl/QueueImpl.java        | 110 ++++----
 .../core/server/impl/ServerConsumerImpl.java       | 277 +++++++++------------
 .../artemis/spi/core/protocol/SessionCallback.java |  10 +-
 .../tests/integration/cli/DummyServerConsumer.java |   2 +-
 .../tests/integration/client/HangConsumerTest.java |  10 +-
 .../tests/integration/jms/client/GroupingTest.java |   1 -
 .../tests/unit/core/server/impl/QueueImplTest.java |  17 +-
 .../unit/core/server/impl/fakes/FakeConsumer.java  |   2 +-
 18 files changed, 268 insertions(+), 282 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index f850cc1..616da5b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -186,11 +186,6 @@ public class AMQPSessionCallback implements SessionCallback {
                                                         (String) null, this, true, operationContext, manager.getPrefixes());
    }
 
-   @Override
-   public void afterDelivery() throws Exception {
-
-   }
-
    public void start() {
 
    }
@@ -605,6 +600,11 @@ public class AMQPSessionCallback implements SessionCallback {
    }
 
    @Override
+   public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
+
+   }
+
+   @Override
    public int sendLargeMessage(MessageReference ref,
                                Message message,
                                ServerConsumer consumer,
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index abcfe3f..bb38539 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -59,6 +59,12 @@ public class MQTTPublishManager {
 
    private MQTTSessionState.OutboundStore outboundStore;
 
+   /** this is the last qos that happened during delivery.
+    *  since afterDelivery will happen in the same thread, no other threads should be calling delivery and afterDelivery
+    *  so it is safe to keep this value here.
+    */
+   private Integer currentQos;
+
    public MQTTPublishManager(MQTTSession session) {
       this.session = session;
    }
@@ -108,7 +114,6 @@ public class MQTTPublishManager {
    boolean isManagementConsumer(ServerConsumer consumer) {
       return consumer == managementConsumer;
    }
-
    /**
     * Since MQTT Subscriptions can over lap; a client may receive the same message twice.  When this happens the client
     * returns a PubRec or PubAck with ID.  But we need to know which consumer to ack, since we only have the ID to go on we
@@ -119,20 +124,35 @@ public class MQTTPublishManager {
    protected void sendMessage(ICoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
       // This is to allow retries of PubRel.
       if (isManagementConsumer(consumer)) {
+         currentQos = null;
          sendPubRelMessage(message);
       } else {
          int qos = decideQoS(message, consumer);
+         currentQos = qos;
          if (qos == 0) {
             sendServerMessage((int) message.getMessageID(),  message, deliveryCount, qos);
-            session.getServerSession().individualAcknowledge(consumer.getID(), message.getMessageID());
          } else if (qos == 1 || qos == 2) {
             int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID());
             outboundStore.publish(mqttid, message.getMessageID(), consumer.getID());
             sendServerMessage(mqttid, message, deliveryCount, qos);
          } else {
+            // this will happen during afterDeliver
+         }
+      }
+   }
+
+   protected void confirmMessage(ICoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
+      if (currentQos != null) {
+         int qos = currentQos.intValue();
+         if (qos == 0) {
+            session.getServerSession().individualAcknowledge(consumer.getID(), message.getMessageID());
+         } else if (qos == 1 || qos == 2) {
+            // everything happened in delivery
+         } else {
             // Client must have disconnected and it's Subscription QoS cleared
             consumer.individualCancel(message.getMessageID(), false);
          }
+
       }
    }
 
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 50d5732..168b7fa 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -60,6 +60,15 @@ public class MQTTSessionCallback implements SessionCallback {
    }
 
    @Override
+   public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) {
+      try {
+         session.getMqttPublishManager().confirmMessage(message.toCore(), consumer, deliveryCount);
+      } catch (Exception e) {
+         log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e);
+      }
+   }
+
+   @Override
    public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
       return false;
    }
@@ -91,11 +100,6 @@ public class MQTTSessionCallback implements SessionCallback {
    }
 
    @Override
-   public void afterDelivery() throws Exception {
-
-   }
-
-   @Override
    public void browserFinished(ServerConsumer consumer) {
 
    }
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index b780563..e4ecd48 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -270,12 +270,6 @@ public class AMQSession implements SessionCallback {
 
    }
 
-   // rename actualDest to destination
-   @Override
-   public void afterDelivery() throws Exception {
-
-   }
-
    @Override
    public void browserFinished(ServerConsumer consumer) {
       AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
@@ -313,6 +307,14 @@ public class AMQSession implements SessionCallback {
    }
 
    @Override
+   public void afterDeliver(MessageReference ref,
+                            org.apache.activemq.artemis.api.core.Message message,
+                            ServerConsumer consumerID,
+                            int deliveryCount) {
+
+   }
+
+   @Override
    public int sendLargeMessage(MessageReference reference,
                                org.apache.activemq.artemis.api.core.Message message,
                                ServerConsumer consumerID,
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 80bbbe8..03f9b44 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -110,8 +110,9 @@ public class StompSession implements SessionCallback {
    public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
    }
 
+
    @Override
-   public void afterDelivery() throws Exception {
+   public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) throws Exception {
       PendingTask task;
       while ((task = afterDeliveryTasks.poll()) != null) {
          task.run();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
index 7e760c1..35eab8a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
@@ -212,11 +212,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
       }
 
       @Override
-      public void afterDelivery() throws Exception {
-
-      }
-
-      @Override
       public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
          return false;
       }
@@ -237,6 +232,11 @@ public class ManagementRemotingConnection implements RemotingConnection {
       }
 
       @Override
+      public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
+
+      }
+
+      @Override
       public int sendLargeMessage(MessageReference reference,
                                   Message message,
                                   ServerConsumer consumerID,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index f53d028..fce0dd5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -133,6 +133,11 @@ public final class CoreSessionCallback implements SessionCallback {
    }
 
    @Override
+   public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
+      // no op
+   }
+
+   @Override
    public void sendProducerCreditsMessage(int credits, SimpleString address) {
       Packet packet = new SessionProducerCreditsMessage(credits, address);
 
@@ -145,11 +150,6 @@ public final class CoreSessionCallback implements SessionCallback {
    }
 
    @Override
-   public void afterDelivery() throws Exception {
-
-   }
-
-   @Override
    public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
       Packet packet = new SessionProducerCreditsFailMessage(credits, address);
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
index babddc2..269c74f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
@@ -25,6 +25,10 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 
 public interface Consumer extends PriorityAware {
 
+   interface GroupHandler {
+      MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, boolean newGroup);
+   }
+
    /**
     *
     * @see SessionCallback#supportsDirectDelivery()
@@ -34,13 +38,7 @@ public interface Consumer extends PriorityAware {
    }
 
    /**
-    * There was a change on semantic during 2.3 here.<br>
-    * We now first accept the message, and the actual deliver is done as part of
-    * {@link #proceedDeliver(MessageReference)}. This is to avoid holding a lock on the queues while
-    * the delivery is being accomplished To avoid a lock on the queue in case of misbehaving
-    * consumers.
-    * <p>
-    * This should return busy if handle is called before proceed deliver is called
+
     *
     * @param reference
     * @return
@@ -48,19 +46,29 @@ public interface Consumer extends PriorityAware {
     */
    HandleStatus handle(MessageReference reference) throws Exception;
 
+   /**
+    * This will return {@link HandleStatus#BUSY} if busy, {@link HandleStatus#NO_MATCH} if no match, or the MessageReference is handled
+    * This should return busy if handle is called before proceed deliver is called
+    * @param groupHandler
+    * @param reference
+    * @return
+    * @throws Exception
+    */
+   default Object handleWithGroup(GroupHandler groupHandler, boolean newGroup, MessageReference reference) throws Exception {
+      return handle(reference);
+   }
+
    /** wakes up internal threads to deliver more messages */
    default void promptDelivery() {
    }
 
    /**
-    * This will proceed with the actual delivery.
-    * Notice that handle should hold a readLock and proceedDelivery should release the readLock
-    * any lock operation on Consumer should also get a writeLock on the readWriteLock
-    * to guarantee there are no pending deliveries
+    * This will called after delivery
+    * Giving protocols a chance to complete their deliveries doing things such as individualACK outside of main locks
     *
     * @throws Exception
     */
-   void proceedDeliver(MessageReference reference) throws Exception;
+   void afterDeliver(MessageReference reference) throws Exception;
 
    Filter getFilter();
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 7d5bbe8..4c73261 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -689,7 +689,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
    // FailureListener implementation --------------------------------
 
    @Override
-   public void proceedDeliver(MessageReference ref) {
+   public void afterDeliver(MessageReference ref) {
       // no op
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
index 7982018..44a5e0b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
@@ -193,7 +193,7 @@ public class Redistributor implements Consumer {
    }
 
    @Override
-   public void proceedDeliver(MessageReference ref) {
+   public void afterDeliver(MessageReference ref) {
       // no op
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 998ef8c..292d15f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -98,7 +98,6 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.utils.BooleanUtil;
 import org.apache.activemq.artemis.utils.Env;
 import org.apache.activemq.artemis.utils.ReferenceCounter;
-import org.apache.activemq.artemis.utils.ReusableLatch;
 import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
@@ -114,7 +113,7 @@ import org.jboss.logging.Logger;
  * <p>
  * Completely non blocking between adding to queue and delivering to consumers.
  */
-public class QueueImpl extends CriticalComponentImpl implements Queue {
+public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.GroupHandler {
 
    protected static final int CRITICAL_PATHS = 5;
    protected static final int CRITICAL_PATH_ADD_TAIL = 0;
@@ -268,8 +267,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    private final ExpiryScanner expiryScanner = new ExpiryScanner();
 
-   private final ReusableLatch deliveriesInTransit = new ReusableLatch(0);
-
    private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis());
 
    private final AtomicLong messagesAddedSnapshot = new AtomicLong(0);
@@ -955,7 +952,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                // directDeliver flag to be re-computed resulting in direct delivery if the queue is empty
                // We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
 
-               if (deliveriesInTransit.getCount() == 0 && getExecutor().isFlushed() &&
+               if (getExecutor().isFlushed() &&
                   intermediateMessageReferences.isEmpty() && messageReferences.isEmpty() &&
                   pageIterator != null && !pageIterator.hasNext() &&
                   pageSubscription != null && !pageSubscription.isPaging()) {
@@ -974,7 +971,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             }
          }
 
-         if (direct && supportsDirectDeliver && directDeliver && deliveriesInTransit.getCount() == 0 && deliverDirect(ref)) {
+         if (direct && supportsDirectDeliver && directDeliver && deliverDirect(ref)) {
             return;
          }
 
@@ -1005,23 +1002,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       return false;
    }
 
-   /**
-    * This will wait for any pending deliveries to finish
-    */
-   private boolean flushDeliveriesInTransit() {
-      try {
-         if (deliveriesInTransit.await(DELIVERY_TIMEOUT)) {
-            return true;
-         } else {
-            ActiveMQServerLogger.LOGGER.timeoutFlushInTransit(getName().toString(), getAddress().toString());
-            return false;
-         }
-      } catch (Exception e) {
-         ActiveMQServerLogger.LOGGER.unableToFlushDeliveries(e);
-         return false;
-      }
-   }
-
    @Override
    public void forceDelivery() {
       if (pageSubscription != null && pageSubscription.isPaging()) {
@@ -2366,7 +2346,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    @Override
    public synchronized void pause(boolean persist) {
       try {
-         this.flushDeliveriesInTransit();
          if (persist && isDurable()) {
             if (pauseStatusRecord >= 0) {
                storageManager.deleteQueueStatus(pauseStatusRecord);
@@ -2607,7 +2586,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                   consumer = groupConsumer;
                }
 
-               HandleStatus status = handle(ref, consumer);
+               Object handleValue = handle(ref, consumer, groupConsumer == null);
+
+               HandleStatus status;
+
+               if (handleValue instanceof MessageReference) {
+                  ref = (MessageReference) handleValue;
+                  status = HandleStatus.HANDLED;
+               } else {
+                  status = (HandleStatus) handleValue;
+               }
 
                if (status == HandleStatus.HANDLED) {
 
@@ -2615,13 +2603,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                   // this is to avoid breaks on the loop when checking for any other factors.
                   noDelivery = 0;
 
-                  if (redistributor == null) {
-                     ref = handleMessageGroup(ref, consumer, groupConsumer, groupID);
-                  }
-
-                  deliveriesInTransit.countUp();
-
-
                   removeMessageReference(holder, ref);
                   handledconsumer = consumer;
                   handled++;
@@ -2653,16 +2634,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                // Round robin'd all
 
                if (noDelivery == this.consumers.size()) {
-                  if (handledconsumer != null) {
-                     // this shouldn't really happen,
-                     // however I'm keeping this as an assertion case future developers ever change the logic here on this class
-                     ActiveMQServerLogger.LOGGER.nonDeliveryHandled();
-                  } else {
-                     if (logger.isDebugEnabled()) {
-                        logger.debug(this + "::All the consumers were busy, giving up now");
-                     }
-                     break;
+                  if (logger.isDebugEnabled()) {
+                     logger.debug(this + "::All the consumers were busy, giving up now");
                   }
+                  break;
                }
 
                noDelivery = 0;
@@ -2670,7 +2645,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          }
 
          if (handledconsumer != null) {
-            proceedDeliver(handledconsumer, ref);
+            afterDeliver(handledconsumer, ref);
          }
       }
 
@@ -3198,7 +3173,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
    }
 
-   private boolean deliver(final MessageReference ref) {
+   private boolean deliver(MessageReference ref) {
       synchronized (this) {
          if (!supportsDirectDeliver) {
             return false;
@@ -3225,20 +3200,24 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                consumer = groupConsumer;
             }
 
-            HandleStatus status = handle(ref, consumer);
+            Object handleValue = handle(ref, consumer, groupConsumer == null);
 
-            if (status == HandleStatus.HANDLED) {
-               final MessageReference reference;
-               if (redistributor == null) {
-                  reference = handleMessageGroup(ref, consumer, groupConsumer, groupID);
-               } else {
-                  reference = ref;
-               }
 
+            HandleStatus status;
+
+            final MessageReference reference;
+            if (handleValue instanceof MessageReference) {
+               reference = (MessageReference) handleValue;
+               status = HandleStatus.HANDLED;
+            } else {
+               reference = ref;
+               status = (HandleStatus) handleValue;
+            }
+
+
+            if (status == HandleStatus.HANDLED) {
                messagesAdded.incrementAndGet();
 
-               deliveriesInTransit.countUp();
-               proceedDeliver(consumer, reference);
                consumers.reset();
                return true;
             }
@@ -3269,9 +3248,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       return groupConsumer;
    }
 
-   private MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, Consumer groupConsumer, SimpleString groupID) {
+   /** This is {@link Consumer.GroupHandler#handleMessageGroup(MessageReference, Consumer, boolean)} */
+   @Override
+   public MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, boolean newGroup) {
+      if (redistributor != null) {
+         // no grouping work on this case
+         return ref;
+      }
+      SimpleString groupID = extractGroupID(ref);
       if (exclusive) {
-         if (groupConsumer == null) {
+         if (newGroup) {
             exclusiveConsumer = consumer;
             if (groupFirstKey != null) {
                return new GroupFirstMessageReference(groupFirstKey, ref);
@@ -3282,7 +3268,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          if (extractGroupSequence(ref) == -1) {
             groups.remove(groupID);
             consumers.repeat();
-         } else if (groupConsumer == null) {
+         } else if (newGroup) {
             groups.put(groupID, consumer);
             if (groupFirstKey != null) {
                return new GroupFirstMessageReference(groupFirstKey, ref);
@@ -3294,13 +3280,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       return ref;
    }
 
-   private void proceedDeliver(Consumer consumer, MessageReference reference) {
+   private void afterDeliver(Consumer consumer, MessageReference reference) {
       try {
-         consumer.proceedDeliver(reference);
+         consumer.afterDeliver(reference);
       } catch (Throwable t) {
          errorProcessing(consumer, t, reference);
-      } finally {
-         deliveriesInTransit.countDown();
       }
    }
 
@@ -3345,10 +3329,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
    }
 
-   private synchronized HandleStatus handle(final MessageReference reference, final Consumer consumer) {
-      HandleStatus status;
+   private synchronized Object handle(final MessageReference reference, final Consumer consumer, boolean newGroup) {
+      Object status;
       try {
-         status = consumer.handle(reference);
+         status = consumer.handleWithGroup(this, newGroup, reference);
       } catch (Throwable t) {
          ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference);
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index c709d4e..dde8d87 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -24,11 +24,8 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -107,13 +104,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
    private SlowConsumerDetectionListener slowConsumerListener;
 
-   /**
-    * We get a readLock when a message is handled, and return the readLock when the message is finally delivered
-    * When stopping the consumer we need to get a writeLock to make sure we had all delivery finished
-    * otherwise a rollback may get message sneaking in
-    */
-   private final ReadWriteLock lockDelivery = new ReentrantReadWriteLock();
-
    private volatile AtomicInteger availableCredits = new AtomicInteger(0);
 
    private boolean started;
@@ -392,8 +382,20 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
       messageQueue.errorProcessing(this, e, deliveryObject);
    }
 
+   /** This is in case someone is using direct old API */
+   @Override
+   public HandleStatus handle(MessageReference ref) throws Exception {
+      Object refReturn = handleWithGroup(null, false, ref);
+
+      if (refReturn instanceof MessageReference) {
+         return HandleStatus.HANDLED;
+      } else {
+         return (HandleStatus) refReturn;
+      }
+
+   }
    @Override
-   public HandleStatus handle(final MessageReference ref) throws Exception {
+   public Object handleWithGroup(GroupHandler handler, boolean newGroup, final MessageReference ref) throws Exception {
       // available credits can be set back to null with a flow control option.
       AtomicInteger checkInteger = availableCredits;
       if (callback != null && !callback.hasCredits(this) || checkInteger != null && checkInteger.get() <= 0) {
@@ -481,42 +483,46 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
          }
 
-         lockDelivery.readLock().lock();
+         MessageReference deliveryReference = ref;
+
+         if (handler != null) {
+            deliveryReference = handler.handleMessageGroup(ref, this, newGroup);
+         }
+
+         proceedDeliver(deliveryReference);
 
          return HandleStatus.HANDLED;
       }
    }
 
-   @Override
-   public void proceedDeliver(MessageReference reference) throws Exception {
-      try {
-         Message message = reference.getMessage();
+   private void proceedDeliver(MessageReference reference) throws Exception {
+      Message message = reference.getMessage();
 
-         if (server.hasBrokerMessagePlugins()) {
-            server.callBrokerMessagePlugins(plugin -> plugin.beforeDeliver(this, reference));
-         }
+      if (server.hasBrokerMessagePlugins()) {
+         server.callBrokerMessagePlugins(plugin -> plugin.beforeDeliver(this, reference));
+      }
 
-         if (message.isLargeMessage() && supportLargeMessage) {
-            if (largeMessageDeliverer == null) {
-               // This can't really happen as handle had already crated the deliverer
-               // instead of throwing an exception in weird cases there is no problem on just go ahead and create it
-               // again here
-               largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage) message, reference);
-            }
-            // The deliverer was prepared during handle, as we can't have more than one pending large message
-            // as it would return busy if there is anything pending
-            largeMessageDeliverer.deliver();
-         } else {
-            deliverStandardMessage(reference, message);
-         }
-      } finally {
-         lockDelivery.readLock().unlock();
-         callback.afterDelivery();
-         if (server.hasBrokerMessagePlugins()) {
-            server.callBrokerMessagePlugins(plugin -> plugin.afterDeliver(this, reference));
+      if (message.isLargeMessage() && supportLargeMessage) {
+         if (largeMessageDeliverer == null) {
+            // This can't really happen as handle had already crated the deliverer
+            // instead of throwing an exception in weird cases there is no problem on just go ahead and create it
+            // again here
+            largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage) message, reference);
          }
+         // The deliverer was prepared during handle, as we can't have more than one pending large message
+         // as it would return busy if there is anything pending
+         largeMessageDeliverer.deliver();
+      } else {
+         deliverStandardMessage(reference, message);
       }
+   }
 
+   @Override
+   public void afterDeliver(MessageReference reference) throws Exception {
+      callback.afterDeliver(reference, reference.getMessage(), ServerConsumerImpl.this, reference.getDeliveryCount());
+      if (server.hasBrokerMessagePlugins()) {
+         server.callBrokerMessagePlugins(plugin -> plugin.afterDeliver(this, reference));
+      }
    }
 
    @Override
@@ -626,7 +632,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
     * there are no other messages to be delivered.
     */
    @Override
-   public void forceDelivery(final long sequence)  {
+   public void forceDelivery(final long sequence) {
       forceDelivery(sequence, () -> {
          Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50);
          MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue);
@@ -730,19 +736,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    @Override
    public void setStarted(final boolean started) {
       synchronized (lock) {
-         boolean locked = lockDelivery();
-
-         // This is to make sure nothing would sneak to the client while started = false
-         // the client will stop the session and perform a rollback in certain cases.
-         // in case something sneaks to the client you could get to messaging delivering forever until
-         // you restart the server
-         try {
-            this.started = browseOnly || started;
-         } finally {
-            if (locked) {
-               lockDelivery.writeLock().unlock();
-            }
-         }
+         this.started = browseOnly || started;
       }
 
       // Outside the lock
@@ -751,35 +745,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
       }
    }
 
-   private boolean lockDelivery() {
-      try {
-         if (!lockDelivery.writeLock().tryLock(30, TimeUnit.SECONDS)) {
-            ActiveMQServerLogger.LOGGER.timeoutLockingConsumer();
-            if (server != null) {
-               server.threadDump();
-            }
-            return false;
-         }
-         return true;
-      } catch (Exception e) {
-         ActiveMQServerLogger.LOGGER.failedToFinishDelivery(e);
-         return false;
-      }
-   }
-
    @Override
    public void setTransferring(final boolean transferring) {
       synchronized (lock) {
-         // This is to make sure that the delivery process has finished any pending delivery
-         // otherwise a message may sneak in on the client while we are trying to stop the consumer
-         boolean locked = lockDelivery();
-         try {
-            this.transferring = transferring;
-         } finally {
-            if (locked) {
-               lockDelivery.writeLock().unlock();
-            }
-         }
+         this.transferring = transferring;
       }
 
       // Outside the lock
@@ -1275,125 +1244,111 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
       }
 
       public boolean deliver() throws Exception {
-         lockDelivery.readLock().lock();
-         try {
-            if (!started) {
-               return false;
-            }
+         if (!started) {
+            return false;
+         }
 
-            LargeServerMessage currentLargeMessage = largeMessage;
-            if (currentLargeMessage == null) {
-               return true;
-            }
+         LargeServerMessage currentLargeMessage = largeMessage;
+         if (currentLargeMessage == null) {
+            return true;
+         }
 
-            if (availableCredits != null && availableCredits.get() <= 0) {
-               if (logger.isTraceEnabled()) {
-                  logger.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" +
-                                  availableCredits);
-               }
-               releaseHeapBodyBuffer();
-               return false;
+         if (availableCredits != null && availableCredits.get() <= 0) {
+            if (logger.isTraceEnabled()) {
+               logger.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" + availableCredits);
             }
+            releaseHeapBodyBuffer();
+            return false;
+         }
 
-            if (!sentInitialPacket) {
-               context = currentLargeMessage.getBodyEncoder();
+         if (!sentInitialPacket) {
+            context = currentLargeMessage.getBodyEncoder();
 
-               sizePendingLargeMessage = context.getLargeBodySize();
+            sizePendingLargeMessage = context.getLargeBodySize();
 
-               context.open();
+            context.open();
 
-               sentInitialPacket = true;
+            sentInitialPacket = true;
 
-               int packetSize = callback.sendLargeMessage(ref, currentLargeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount());
+            int packetSize = callback.sendLargeMessage(ref, currentLargeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount());
 
-               if (availableCredits != null) {
-                  final int credits = availableCredits.addAndGet(-packetSize);
+            if (availableCredits != null) {
+               final int credits = availableCredits.addAndGet(-packetSize);
 
-                  if (credits <= 0) {
-                     releaseHeapBodyBuffer();
-                  }
+               if (credits <= 0) {
+                  releaseHeapBodyBuffer();
+               }
 
-                  if (logger.isTraceEnabled()) {
-                     logger.trace(this + "::FlowControl::" +
-                                     " deliver initialpackage with " +
-                                     packetSize +
-                                     " delivered, available now = " +
-                                     availableCredits);
-                  }
+               if (logger.isTraceEnabled()) {
+                  logger.trace(this + "::FlowControl::" + " deliver initialpackage with " + packetSize + " delivered, available now = " + availableCredits);
                }
+            }
 
-               // Execute the rest of the large message on a different thread so as not to tie up the delivery thread
-               // for too long
+            // Execute the rest of the large message on a different thread so as not to tie up the delivery thread
+            // for too long
 
-               resumeLargeMessage();
+            resumeLargeMessage();
 
-               return false;
-            } else {
-               if (availableCredits != null && availableCredits.get() <= 0) {
-                  if (logger.isTraceEnabled()) {
-                     logger.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" +
-                                     availableCredits);
-                  }
-                  releaseHeapBodyBuffer();
-                  return false;
+            return false;
+         } else {
+            if (availableCredits != null && availableCredits.get() <= 0) {
+               if (logger.isTraceEnabled()) {
+                  logger.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" + availableCredits);
                }
+               releaseHeapBodyBuffer();
+               return false;
+            }
 
-               final int localChunkLen = (int) Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
+            final int localChunkLen = (int) Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
 
-               final ByteBuffer bodyBuffer = acquireHeapBodyBuffer(localChunkLen);
+            final ByteBuffer bodyBuffer = acquireHeapBodyBuffer(localChunkLen);
 
-               assert bodyBuffer.remaining() == localChunkLen;
+            assert bodyBuffer.remaining() == localChunkLen;
 
-               final int readBytes = context.encode(bodyBuffer);
+            final int readBytes = context.encode(bodyBuffer);
 
-               assert readBytes == localChunkLen;
+            assert readBytes == localChunkLen;
 
-               final byte[] body = bodyBuffer.array();
+            final byte[] body = bodyBuffer.array();
 
-               assert body.length == readBytes;
+            assert body.length == readBytes;
 
-               //It is possible to recycle the same heap body buffer because it won't be cached by sendLargeMessageContinuation
-               //given that requiresResponse is false: ChannelImpl::send will use the resend cache only if
-               //resendCache != null && packet.isRequiresConfirmations()
+            //It is possible to recycle the same heap body buffer because it won't be cached by sendLargeMessageContinuation
+            //given that requiresResponse is false: ChannelImpl::send will use the resend cache only if
+            //resendCache != null && packet.isRequiresConfirmations()
 
-               int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this, body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false);
+            int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this, body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false);
 
-               int chunkLen = body.length;
+            int chunkLen = body.length;
 
-               if (availableCredits != null) {
-                  final int credits = availableCredits.addAndGet(-packetSize);
+            if (availableCredits != null) {
+               final int credits = availableCredits.addAndGet(-packetSize);
 
-                  if (credits <= 0) {
-                     releaseHeapBodyBuffer();
-                  }
+               if (credits <= 0) {
+                  releaseHeapBodyBuffer();
+               }
 
-                  if (logger.isTraceEnabled()) {
-                     logger.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" +
-                                     packetSize +
-                                     " available now=" +
-                                     availableCredits);
-                  }
+               if (logger.isTraceEnabled()) {
+                  logger.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" + packetSize + " available now=" + availableCredits);
                }
+            }
 
-               positionPendingLargeMessage += chunkLen;
+            positionPendingLargeMessage += chunkLen;
 
-               if (positionPendingLargeMessage < sizePendingLargeMessage) {
-                  resumeLargeMessage();
+            if (positionPendingLargeMessage < sizePendingLargeMessage) {
+               resumeLargeMessage();
 
-                  return false;
-               }
+               return false;
             }
+         }
 
-            if (logger.isTraceEnabled()) {
-               logger.trace("Finished deliverLargeMessage");
-            }
+         if (logger.isTraceEnabled()) {
+            logger.trace("Finished deliverLargeMessage");
+         }
 
-            finish();
+         finish();
 
-            return true;
-         } finally {
-            lockDelivery.readLock().unlock();
-         }
+         return true;
       }
 
       public void finish() throws Exception {
@@ -1453,7 +1408,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
                }
 
                if (status == HandleStatus.HANDLED) {
-                  proceedDeliver(current);
+                  afterDeliver(current);
                }
 
                current = null;
@@ -1481,7 +1436,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
                }
 
                if (status == HandleStatus.HANDLED) {
-                  proceedDeliver(ref);
+                  afterDeliver(ref);
                } else if (status == HandleStatus.BUSY) {
                   // keep a reference on the current message reference
                   // to handle it next time the browser deliverer is executed
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index 5577522..18ef253 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -41,11 +41,10 @@ public interface SessionCallback {
     */
    boolean hasCredits(ServerConsumer consumerID);
 
-   /**
-    * This can be used to complete certain operations outside of the lock,
-    * like acks or other operations.
-    */
-   void afterDelivery() throws Exception;
+   // Certain protocols (MQTT) will need to confirm messages doing things such as individualACKS
+   // and these need to be done outside of the main lock.
+   // otherwise we could dead-lock during delivery
+   void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) throws Exception;
 
    /**
     * Use this to updates specifics on the message after a redelivery happened.
@@ -69,6 +68,7 @@ public interface SessionCallback {
    //       Future developments may change this, but beware why I have chosen to keep the parameter separated here
    int sendMessage(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount);
 
+
    int sendLargeMessage(MessageReference reference,
                         Message message,
                         ServerConsumer consumerID,
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
index 58bf2d3..9bc0ea2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
@@ -190,7 +190,7 @@ public class DummyServerConsumer implements ServerConsumer {
    }
 
    @Override
-   public void proceedDeliver(MessageReference reference) throws Exception {
+   public void afterDeliver(MessageReference reference) throws Exception {
 
    }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 3e64ac5..0006752 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -528,11 +528,6 @@ public class HangConsumerTest extends ActiveMQTestBase {
       }
 
       @Override
-      public void afterDelivery() throws Exception {
-
-      }
-
-      @Override
       public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
          targetCallback.sendProducerCreditsFailMessage(credits, address);
       }
@@ -558,6 +553,11 @@ public class HangConsumerTest extends ActiveMQTestBase {
          }
       }
 
+      @Override
+      public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
+
+      }
+
       /* (non-Javadoc)
        * @see SessionCallback#sendLargeMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, long, int)
        */
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java
index ba8cd95..fc32c44 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java
@@ -271,7 +271,6 @@ public class GroupingTest extends JMSTestBase {
 
          assertEquals(tm.getStringProperty("JMSXGroupID"), jmsxgroupID);
       }
-      Thread.sleep(2000);
       //session.rollback();
       //session.close();
       //consume all msgs from 2nd first consumer
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
index f20cce3..076e164 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
@@ -1312,12 +1312,21 @@ public class QueueImplTest extends ActiveMQTestBase {
 
          @Override
          public synchronized HandleStatus handle(MessageReference reference) {
+            return HandleStatus.HANDLED;
+         }
+
+         @Override
+         public Object handleWithGroup(GroupHandler groupHandler, boolean newGroup, MessageReference reference) throws Exception {
             if (count == 0) {
                //the first message is handled and will be used to determine this consumer
                //to be the group consumer
                count++;
                firstMessageHandled.countDown();
-               return HandleStatus.HANDLED;
+               if (groupHandler != null) {
+                  return groupHandler.handleMessageGroup(reference, this, newGroup);
+               } else {
+                  return HandleStatus.HANDLED;
+               }
             } else if (count <= 2) {
                //the next two attempts to send the second message will be done
                //attempting a direct delivery and an async one after that
@@ -1329,7 +1338,11 @@ public class QueueImplTest extends ActiveMQTestBase {
                //the second message should have stop the delivery loop:
                //it will succeed just to let the message being handled and
                //reduce the message count to 0
-               return HandleStatus.HANDLED;
+               if (groupHandler != null) {
+                  return groupHandler.handleMessageGroup(reference, this, newGroup);
+               } else {
+                  return HandleStatus.HANDLED;
+               }
             }
          }
       };
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
index 2a5a330..47b042b 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
@@ -138,7 +138,7 @@ public class FakeConsumer implements Consumer {
    }
 
    @Override
-   public void proceedDeliver(MessageReference ref) throws Exception {
+   public void afterDeliver(MessageReference ref) throws Exception {
       // no op
    }