You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/07/27 03:56:31 UTC

[activemq-artemis] branch master updated (f269b79 -> f73d7f5)

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git.


    from f269b79  This closes #2765
     new d2d2151  Revert "ARTEMIS-2434 Don't lock ServerConsumerImpl for long period of time"
     new 7507a9f  ARTEMIS-2423 Improving Consumer/Queue Delivery lock
     new f73d7f5  This closes #2772

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../protocol/amqp/broker/AMQPSessionCallback.java  |  10 +-
 .../core/protocol/mqtt/MQTTPublishManager.java     |  24 +-
 .../core/protocol/mqtt/MQTTSessionCallback.java    |  14 +-
 .../core/protocol/openwire/amq/AMQSession.java     |  14 +-
 .../artemis/core/protocol/stomp/StompSession.java  |   3 +-
 .../impl/ManagementRemotingConnection.java         |  10 +-
 .../protocol/core/impl/CoreSessionCallback.java    |  10 +-
 .../activemq/artemis/core/server/Consumer.java     |  32 ++-
 .../core/server/cluster/impl/BridgeImpl.java       |   2 +-
 .../core/server/cluster/impl/Redistributor.java    |   2 +-
 .../artemis/core/server/impl/QueueImpl.java        | 110 ++++----
 .../core/server/impl/ServerConsumerImpl.java       | 296 +++++++++------------
 .../artemis/spi/core/protocol/SessionCallback.java |  10 +-
 .../tests/integration/cli/DummyServerConsumer.java |   2 +-
 .../tests/integration/client/HangConsumerTest.java |  10 +-
 .../tests/integration/jms/client/GroupingTest.java |   1 -
 .../tests/unit/core/server/impl/QueueImplTest.java |  17 +-
 .../unit/core/server/impl/fakes/FakeConsumer.java  |   2 +-
 18 files changed, 272 insertions(+), 297 deletions(-)


[activemq-artemis] 03/03: This closes #2772

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit f73d7f5dd3d0e4bd6ebf0b1b7e63f04477b1db4e
Merge: f269b79 7507a9f
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Jul 26 23:56:22 2019 -0400

    This closes #2772

 .../protocol/amqp/broker/AMQPSessionCallback.java  |  10 +-
 .../core/protocol/mqtt/MQTTPublishManager.java     |  24 +-
 .../core/protocol/mqtt/MQTTSessionCallback.java    |  14 +-
 .../core/protocol/openwire/amq/AMQSession.java     |  14 +-
 .../artemis/core/protocol/stomp/StompSession.java  |   3 +-
 .../impl/ManagementRemotingConnection.java         |  10 +-
 .../protocol/core/impl/CoreSessionCallback.java    |  10 +-
 .../activemq/artemis/core/server/Consumer.java     |  32 ++-
 .../core/server/cluster/impl/BridgeImpl.java       |   2 +-
 .../core/server/cluster/impl/Redistributor.java    |   2 +-
 .../artemis/core/server/impl/QueueImpl.java        | 110 ++++----
 .../core/server/impl/ServerConsumerImpl.java       | 296 +++++++++------------
 .../artemis/spi/core/protocol/SessionCallback.java |  10 +-
 .../tests/integration/cli/DummyServerConsumer.java |   2 +-
 .../tests/integration/client/HangConsumerTest.java |  10 +-
 .../tests/integration/jms/client/GroupingTest.java |   1 -
 .../tests/unit/core/server/impl/QueueImplTest.java |  17 +-
 .../unit/core/server/impl/fakes/FakeConsumer.java  |   2 +-
 18 files changed, 272 insertions(+), 297 deletions(-)


[activemq-artemis] 01/03: Revert "ARTEMIS-2434 Don't lock ServerConsumerImpl for long period of time"

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit d2d21516ba79c350f4a7827c97435aaaaea15311
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Jul 26 13:57:05 2019 -0400

    Revert "ARTEMIS-2434 Don't lock ServerConsumerImpl for long period of time"
    
    This reverts commit 64ba930f43b8fa3a78730d875d7e354cb9631675.
---
 .../core/server/impl/ServerConsumerImpl.java       | 79 ++++++++++------------
 1 file changed, 34 insertions(+), 45 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index e19b1e5..c709d4e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.LockSupport;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -730,68 +729,58 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
    @Override
    public void setStarted(final boolean started) {
-      lockDelivery(locked -> {
+      synchronized (lock) {
+         boolean locked = lockDelivery();
+
          // This is to make sure nothing would sneak to the client while started = false
          // the client will stop the session and perform a rollback in certain cases.
          // in case something sneaks to the client you could get to messaging delivering forever until
          // you restart the server
-         this.started = browseOnly || started;
-      });
+         try {
+            this.started = browseOnly || started;
+         } finally {
+            if (locked) {
+               lockDelivery.writeLock().unlock();
+            }
+         }
+      }
+
       // Outside the lock
       if (started) {
          promptDelivery();
       }
    }
 
-   private static final long LOCK_DELIVERY_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(30);
-   private static final long TRY_LOCK_NS = TimeUnit.MILLISECONDS.toNanos(100);
-
-   private boolean lockDelivery(java.util.function.Consumer<Boolean> task) {
-      final long startWait = System.nanoTime();
-      long now;
-      while (((now = System.nanoTime()) - startWait) < LOCK_DELIVERY_TIMEOUT_NS) {
-         try {
-            if (Thread.currentThread().isInterrupted()) {
-               throw new InterruptedException();
-            }
-         } catch (Exception e) {
-            ActiveMQServerLogger.LOGGER.failedToFinishDelivery(e);
-            synchronized (lock) {
-               task.accept(false);
+   private boolean lockDelivery() {
+      try {
+         if (!lockDelivery.writeLock().tryLock(30, TimeUnit.SECONDS)) {
+            ActiveMQServerLogger.LOGGER.timeoutLockingConsumer();
+            if (server != null) {
+               server.threadDump();
             }
             return false;
          }
-         synchronized (lock) {
-            if (lockDelivery.writeLock().tryLock()) {
-               try {
-                  task.accept(true);
-               } finally {
-                  lockDelivery.writeLock().unlock();
-               }
-               return true;
-            }
-         }
-         //entering the lock can take some time: discount that time from the
-         //time before attempting to lock delivery
-         final long timeToLock = System.nanoTime() - now;
-         if (timeToLock < TRY_LOCK_NS) {
-            final long timeToWait = TRY_LOCK_NS - timeToLock;
-            LockSupport.parkNanos(timeToWait);
-         }
-      }
-      ActiveMQServerLogger.LOGGER.timeoutLockingConsumer();
-      if (server != null) {
-         server.threadDump();
-      }
-      synchronized (lock) {
-         task.accept(false);
+         return true;
+      } catch (Exception e) {
+         ActiveMQServerLogger.LOGGER.failedToFinishDelivery(e);
+         return false;
       }
-      return false;
    }
 
    @Override
    public void setTransferring(final boolean transferring) {
-      lockDelivery(locked -> this.transferring = transferring);
+      synchronized (lock) {
+         // This is to make sure that the delivery process has finished any pending delivery
+         // otherwise a message may sneak in on the client while we are trying to stop the consumer
+         boolean locked = lockDelivery();
+         try {
+            this.transferring = transferring;
+         } finally {
+            if (locked) {
+               lockDelivery.writeLock().unlock();
+            }
+         }
+      }
 
       // Outside the lock
       if (transferring) {


[activemq-artemis] 02/03: ARTEMIS-2423 Improving Consumer/Queue Delivery lock

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 7507a9fd4b282523c2b2f3517ed788153a35df4c
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Thu Jul 25 12:10:24 2019 -0400

    ARTEMIS-2423 Improving Consumer/Queue Delivery lock
---
 .../protocol/amqp/broker/AMQPSessionCallback.java  |  10 +-
 .../core/protocol/mqtt/MQTTPublishManager.java     |  24 +-
 .../core/protocol/mqtt/MQTTSessionCallback.java    |  14 +-
 .../core/protocol/openwire/amq/AMQSession.java     |  14 +-
 .../artemis/core/protocol/stomp/StompSession.java  |   3 +-
 .../impl/ManagementRemotingConnection.java         |  10 +-
 .../protocol/core/impl/CoreSessionCallback.java    |  10 +-
 .../activemq/artemis/core/server/Consumer.java     |  32 ++-
 .../core/server/cluster/impl/BridgeImpl.java       |   2 +-
 .../core/server/cluster/impl/Redistributor.java    |   2 +-
 .../artemis/core/server/impl/QueueImpl.java        | 110 ++++----
 .../core/server/impl/ServerConsumerImpl.java       | 277 +++++++++------------
 .../artemis/spi/core/protocol/SessionCallback.java |  10 +-
 .../tests/integration/cli/DummyServerConsumer.java |   2 +-
 .../tests/integration/client/HangConsumerTest.java |  10 +-
 .../tests/integration/jms/client/GroupingTest.java |   1 -
 .../tests/unit/core/server/impl/QueueImplTest.java |  17 +-
 .../unit/core/server/impl/fakes/FakeConsumer.java  |   2 +-
 18 files changed, 268 insertions(+), 282 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index f850cc1..616da5b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -186,11 +186,6 @@ public class AMQPSessionCallback implements SessionCallback {
                                                         (String) null, this, true, operationContext, manager.getPrefixes());
    }
 
-   @Override
-   public void afterDelivery() throws Exception {
-
-   }
-
    public void start() {
 
    }
@@ -605,6 +600,11 @@ public class AMQPSessionCallback implements SessionCallback {
    }
 
    @Override
+   public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
+
+   }
+
+   @Override
    public int sendLargeMessage(MessageReference ref,
                                Message message,
                                ServerConsumer consumer,
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index abcfe3f..bb38539 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -59,6 +59,12 @@ public class MQTTPublishManager {
 
    private MQTTSessionState.OutboundStore outboundStore;
 
+   /** this is the last qos that happened during delivery.
+    *  since afterDelivery will happen in the same thread, no other threads should be calling delivery and afterDelivery
+    *  so it is safe to keep this value here.
+    */
+   private Integer currentQos;
+
    public MQTTPublishManager(MQTTSession session) {
       this.session = session;
    }
@@ -108,7 +114,6 @@ public class MQTTPublishManager {
    boolean isManagementConsumer(ServerConsumer consumer) {
       return consumer == managementConsumer;
    }
-
    /**
     * Since MQTT Subscriptions can over lap; a client may receive the same message twice.  When this happens the client
     * returns a PubRec or PubAck with ID.  But we need to know which consumer to ack, since we only have the ID to go on we
@@ -119,20 +124,35 @@ public class MQTTPublishManager {
    protected void sendMessage(ICoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
       // This is to allow retries of PubRel.
       if (isManagementConsumer(consumer)) {
+         currentQos = null;
          sendPubRelMessage(message);
       } else {
          int qos = decideQoS(message, consumer);
+         currentQos = qos;
          if (qos == 0) {
             sendServerMessage((int) message.getMessageID(),  message, deliveryCount, qos);
-            session.getServerSession().individualAcknowledge(consumer.getID(), message.getMessageID());
          } else if (qos == 1 || qos == 2) {
             int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID());
             outboundStore.publish(mqttid, message.getMessageID(), consumer.getID());
             sendServerMessage(mqttid, message, deliveryCount, qos);
          } else {
+            // this will happen during afterDeliver
+         }
+      }
+   }
+
+   protected void confirmMessage(ICoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
+      if (currentQos != null) {
+         int qos = currentQos.intValue();
+         if (qos == 0) {
+            session.getServerSession().individualAcknowledge(consumer.getID(), message.getMessageID());
+         } else if (qos == 1 || qos == 2) {
+            // everything happened in delivery
+         } else {
             // Client must have disconnected and it's Subscription QoS cleared
             consumer.individualCancel(message.getMessageID(), false);
          }
+
       }
    }
 
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 50d5732..168b7fa 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -60,6 +60,15 @@ public class MQTTSessionCallback implements SessionCallback {
    }
 
    @Override
+   public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) {
+      try {
+         session.getMqttPublishManager().confirmMessage(message.toCore(), consumer, deliveryCount);
+      } catch (Exception e) {
+         log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e);
+      }
+   }
+
+   @Override
    public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
       return false;
    }
@@ -91,11 +100,6 @@ public class MQTTSessionCallback implements SessionCallback {
    }
 
    @Override
-   public void afterDelivery() throws Exception {
-
-   }
-
-   @Override
    public void browserFinished(ServerConsumer consumer) {
 
    }
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index b780563..e4ecd48 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -270,12 +270,6 @@ public class AMQSession implements SessionCallback {
 
    }
 
-   // rename actualDest to destination
-   @Override
-   public void afterDelivery() throws Exception {
-
-   }
-
    @Override
    public void browserFinished(ServerConsumer consumer) {
       AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
@@ -313,6 +307,14 @@ public class AMQSession implements SessionCallback {
    }
 
    @Override
+   public void afterDeliver(MessageReference ref,
+                            org.apache.activemq.artemis.api.core.Message message,
+                            ServerConsumer consumerID,
+                            int deliveryCount) {
+
+   }
+
+   @Override
    public int sendLargeMessage(MessageReference reference,
                                org.apache.activemq.artemis.api.core.Message message,
                                ServerConsumer consumerID,
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 80bbbe8..03f9b44 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -110,8 +110,9 @@ public class StompSession implements SessionCallback {
    public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
    }
 
+
    @Override
-   public void afterDelivery() throws Exception {
+   public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) throws Exception {
       PendingTask task;
       while ((task = afterDeliveryTasks.poll()) != null) {
          task.run();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
index 7e760c1..35eab8a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
@@ -212,11 +212,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
       }
 
       @Override
-      public void afterDelivery() throws Exception {
-
-      }
-
-      @Override
       public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
          return false;
       }
@@ -237,6 +232,11 @@ public class ManagementRemotingConnection implements RemotingConnection {
       }
 
       @Override
+      public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
+
+      }
+
+      @Override
       public int sendLargeMessage(MessageReference reference,
                                   Message message,
                                   ServerConsumer consumerID,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index f53d028..fce0dd5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -133,6 +133,11 @@ public final class CoreSessionCallback implements SessionCallback {
    }
 
    @Override
+   public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
+      // no op
+   }
+
+   @Override
    public void sendProducerCreditsMessage(int credits, SimpleString address) {
       Packet packet = new SessionProducerCreditsMessage(credits, address);
 
@@ -145,11 +150,6 @@ public final class CoreSessionCallback implements SessionCallback {
    }
 
    @Override
-   public void afterDelivery() throws Exception {
-
-   }
-
-   @Override
    public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
       Packet packet = new SessionProducerCreditsFailMessage(credits, address);
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
index babddc2..269c74f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
@@ -25,6 +25,10 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 
 public interface Consumer extends PriorityAware {
 
+   interface GroupHandler {
+      MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, boolean newGroup);
+   }
+
    /**
     *
     * @see SessionCallback#supportsDirectDelivery()
@@ -34,13 +38,7 @@ public interface Consumer extends PriorityAware {
    }
 
    /**
-    * There was a change on semantic during 2.3 here.<br>
-    * We now first accept the message, and the actual deliver is done as part of
-    * {@link #proceedDeliver(MessageReference)}. This is to avoid holding a lock on the queues while
-    * the delivery is being accomplished To avoid a lock on the queue in case of misbehaving
-    * consumers.
-    * <p>
-    * This should return busy if handle is called before proceed deliver is called
+
     *
     * @param reference
     * @return
@@ -48,19 +46,29 @@ public interface Consumer extends PriorityAware {
     */
    HandleStatus handle(MessageReference reference) throws Exception;
 
+   /**
+    * This will return {@link HandleStatus#BUSY} if busy, {@link HandleStatus#NO_MATCH} if no match, or the MessageReference is handled
+    * This should return busy if handle is called before proceed deliver is called
+    * @param groupHandler
+    * @param reference
+    * @return
+    * @throws Exception
+    */
+   default Object handleWithGroup(GroupHandler groupHandler, boolean newGroup, MessageReference reference) throws Exception {
+      return handle(reference);
+   }
+
    /** wakes up internal threads to deliver more messages */
    default void promptDelivery() {
    }
 
    /**
-    * This will proceed with the actual delivery.
-    * Notice that handle should hold a readLock and proceedDelivery should release the readLock
-    * any lock operation on Consumer should also get a writeLock on the readWriteLock
-    * to guarantee there are no pending deliveries
+    * This will called after delivery
+    * Giving protocols a chance to complete their deliveries doing things such as individualACK outside of main locks
     *
     * @throws Exception
     */
-   void proceedDeliver(MessageReference reference) throws Exception;
+   void afterDeliver(MessageReference reference) throws Exception;
 
    Filter getFilter();
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 7d5bbe8..4c73261 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -689,7 +689,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
    // FailureListener implementation --------------------------------
 
    @Override
-   public void proceedDeliver(MessageReference ref) {
+   public void afterDeliver(MessageReference ref) {
       // no op
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
index 7982018..44a5e0b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
@@ -193,7 +193,7 @@ public class Redistributor implements Consumer {
    }
 
    @Override
-   public void proceedDeliver(MessageReference ref) {
+   public void afterDeliver(MessageReference ref) {
       // no op
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 998ef8c..292d15f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -98,7 +98,6 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.utils.BooleanUtil;
 import org.apache.activemq.artemis.utils.Env;
 import org.apache.activemq.artemis.utils.ReferenceCounter;
-import org.apache.activemq.artemis.utils.ReusableLatch;
 import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
@@ -114,7 +113,7 @@ import org.jboss.logging.Logger;
  * <p>
  * Completely non blocking between adding to queue and delivering to consumers.
  */
-public class QueueImpl extends CriticalComponentImpl implements Queue {
+public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.GroupHandler {
 
    protected static final int CRITICAL_PATHS = 5;
    protected static final int CRITICAL_PATH_ADD_TAIL = 0;
@@ -268,8 +267,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    private final ExpiryScanner expiryScanner = new ExpiryScanner();
 
-   private final ReusableLatch deliveriesInTransit = new ReusableLatch(0);
-
    private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis());
 
    private final AtomicLong messagesAddedSnapshot = new AtomicLong(0);
@@ -955,7 +952,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                // directDeliver flag to be re-computed resulting in direct delivery if the queue is empty
                // We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
 
-               if (deliveriesInTransit.getCount() == 0 && getExecutor().isFlushed() &&
+               if (getExecutor().isFlushed() &&
                   intermediateMessageReferences.isEmpty() && messageReferences.isEmpty() &&
                   pageIterator != null && !pageIterator.hasNext() &&
                   pageSubscription != null && !pageSubscription.isPaging()) {
@@ -974,7 +971,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             }
          }
 
-         if (direct && supportsDirectDeliver && directDeliver && deliveriesInTransit.getCount() == 0 && deliverDirect(ref)) {
+         if (direct && supportsDirectDeliver && directDeliver && deliverDirect(ref)) {
             return;
          }
 
@@ -1005,23 +1002,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       return false;
    }
 
-   /**
-    * This will wait for any pending deliveries to finish
-    */
-   private boolean flushDeliveriesInTransit() {
-      try {
-         if (deliveriesInTransit.await(DELIVERY_TIMEOUT)) {
-            return true;
-         } else {
-            ActiveMQServerLogger.LOGGER.timeoutFlushInTransit(getName().toString(), getAddress().toString());
-            return false;
-         }
-      } catch (Exception e) {
-         ActiveMQServerLogger.LOGGER.unableToFlushDeliveries(e);
-         return false;
-      }
-   }
-
    @Override
    public void forceDelivery() {
       if (pageSubscription != null && pageSubscription.isPaging()) {
@@ -2366,7 +2346,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    @Override
    public synchronized void pause(boolean persist) {
       try {
-         this.flushDeliveriesInTransit();
          if (persist && isDurable()) {
             if (pauseStatusRecord >= 0) {
                storageManager.deleteQueueStatus(pauseStatusRecord);
@@ -2607,7 +2586,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                   consumer = groupConsumer;
                }
 
-               HandleStatus status = handle(ref, consumer);
+               Object handleValue = handle(ref, consumer, groupConsumer == null);
+
+               HandleStatus status;
+
+               if (handleValue instanceof MessageReference) {
+                  ref = (MessageReference) handleValue;
+                  status = HandleStatus.HANDLED;
+               } else {
+                  status = (HandleStatus) handleValue;
+               }
 
                if (status == HandleStatus.HANDLED) {
 
@@ -2615,13 +2603,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                   // this is to avoid breaks on the loop when checking for any other factors.
                   noDelivery = 0;
 
-                  if (redistributor == null) {
-                     ref = handleMessageGroup(ref, consumer, groupConsumer, groupID);
-                  }
-
-                  deliveriesInTransit.countUp();
-
-
                   removeMessageReference(holder, ref);
                   handledconsumer = consumer;
                   handled++;
@@ -2653,16 +2634,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                // Round robin'd all
 
                if (noDelivery == this.consumers.size()) {
-                  if (handledconsumer != null) {
-                     // this shouldn't really happen,
-                     // however I'm keeping this as an assertion case future developers ever change the logic here on this class
-                     ActiveMQServerLogger.LOGGER.nonDeliveryHandled();
-                  } else {
-                     if (logger.isDebugEnabled()) {
-                        logger.debug(this + "::All the consumers were busy, giving up now");
-                     }
-                     break;
+                  if (logger.isDebugEnabled()) {
+                     logger.debug(this + "::All the consumers were busy, giving up now");
                   }
+                  break;
                }
 
                noDelivery = 0;
@@ -2670,7 +2645,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          }
 
          if (handledconsumer != null) {
-            proceedDeliver(handledconsumer, ref);
+            afterDeliver(handledconsumer, ref);
          }
       }
 
@@ -3198,7 +3173,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
    }
 
-   private boolean deliver(final MessageReference ref) {
+   private boolean deliver(MessageReference ref) {
       synchronized (this) {
          if (!supportsDirectDeliver) {
             return false;
@@ -3225,20 +3200,24 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                consumer = groupConsumer;
             }
 
-            HandleStatus status = handle(ref, consumer);
+            Object handleValue = handle(ref, consumer, groupConsumer == null);
 
-            if (status == HandleStatus.HANDLED) {
-               final MessageReference reference;
-               if (redistributor == null) {
-                  reference = handleMessageGroup(ref, consumer, groupConsumer, groupID);
-               } else {
-                  reference = ref;
-               }
 
+            HandleStatus status;
+
+            final MessageReference reference;
+            if (handleValue instanceof MessageReference) {
+               reference = (MessageReference) handleValue;
+               status = HandleStatus.HANDLED;
+            } else {
+               reference = ref;
+               status = (HandleStatus) handleValue;
+            }
+
+
+            if (status == HandleStatus.HANDLED) {
                messagesAdded.incrementAndGet();
 
-               deliveriesInTransit.countUp();
-               proceedDeliver(consumer, reference);
                consumers.reset();
                return true;
             }
@@ -3269,9 +3248,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       return groupConsumer;
    }
 
-   private MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, Consumer groupConsumer, SimpleString groupID) {
+   /** This is {@link Consumer.GroupHandler#handleMessageGroup(MessageReference, Consumer, boolean)} */
+   @Override
+   public MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, boolean newGroup) {
+      if (redistributor != null) {
+         // no grouping work on this case
+         return ref;
+      }
+      SimpleString groupID = extractGroupID(ref);
       if (exclusive) {
-         if (groupConsumer == null) {
+         if (newGroup) {
             exclusiveConsumer = consumer;
             if (groupFirstKey != null) {
                return new GroupFirstMessageReference(groupFirstKey, ref);
@@ -3282,7 +3268,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          if (extractGroupSequence(ref) == -1) {
             groups.remove(groupID);
             consumers.repeat();
-         } else if (groupConsumer == null) {
+         } else if (newGroup) {
             groups.put(groupID, consumer);
             if (groupFirstKey != null) {
                return new GroupFirstMessageReference(groupFirstKey, ref);
@@ -3294,13 +3280,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       return ref;
    }
 
-   private void proceedDeliver(Consumer consumer, MessageReference reference) {
+   private void afterDeliver(Consumer consumer, MessageReference reference) {
       try {
-         consumer.proceedDeliver(reference);
+         consumer.afterDeliver(reference);
       } catch (Throwable t) {
          errorProcessing(consumer, t, reference);
-      } finally {
-         deliveriesInTransit.countDown();
       }
    }
 
@@ -3345,10 +3329,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
    }
 
-   private synchronized HandleStatus handle(final MessageReference reference, final Consumer consumer) {
-      HandleStatus status;
+   private synchronized Object handle(final MessageReference reference, final Consumer consumer, boolean newGroup) {
+      Object status;
       try {
-         status = consumer.handle(reference);
+         status = consumer.handleWithGroup(this, newGroup, reference);
       } catch (Throwable t) {
          ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference);
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index c709d4e..dde8d87 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -24,11 +24,8 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -107,13 +104,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
    private SlowConsumerDetectionListener slowConsumerListener;
 
-   /**
-    * We get a readLock when a message is handled, and return the readLock when the message is finally delivered
-    * When stopping the consumer we need to get a writeLock to make sure we had all delivery finished
-    * otherwise a rollback may get message sneaking in
-    */
-   private final ReadWriteLock lockDelivery = new ReentrantReadWriteLock();
-
    private volatile AtomicInteger availableCredits = new AtomicInteger(0);
 
    private boolean started;
@@ -392,8 +382,20 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
       messageQueue.errorProcessing(this, e, deliveryObject);
    }
 
+   /** This is in case someone is using direct old API */
+   @Override
+   public HandleStatus handle(MessageReference ref) throws Exception {
+      Object refReturn = handleWithGroup(null, false, ref);
+
+      if (refReturn instanceof MessageReference) {
+         return HandleStatus.HANDLED;
+      } else {
+         return (HandleStatus) refReturn;
+      }
+
+   }
    @Override
-   public HandleStatus handle(final MessageReference ref) throws Exception {
+   public Object handleWithGroup(GroupHandler handler, boolean newGroup, final MessageReference ref) throws Exception {
       // available credits can be set back to null with a flow control option.
       AtomicInteger checkInteger = availableCredits;
       if (callback != null && !callback.hasCredits(this) || checkInteger != null && checkInteger.get() <= 0) {
@@ -481,42 +483,46 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
          }
 
-         lockDelivery.readLock().lock();
+         MessageReference deliveryReference = ref;
+
+         if (handler != null) {
+            deliveryReference = handler.handleMessageGroup(ref, this, newGroup);
+         }
+
+         proceedDeliver(deliveryReference);
 
          return HandleStatus.HANDLED;
       }
    }
 
-   @Override
-   public void proceedDeliver(MessageReference reference) throws Exception {
-      try {
-         Message message = reference.getMessage();
+   private void proceedDeliver(MessageReference reference) throws Exception {
+      Message message = reference.getMessage();
 
-         if (server.hasBrokerMessagePlugins()) {
-            server.callBrokerMessagePlugins(plugin -> plugin.beforeDeliver(this, reference));
-         }
+      if (server.hasBrokerMessagePlugins()) {
+         server.callBrokerMessagePlugins(plugin -> plugin.beforeDeliver(this, reference));
+      }
 
-         if (message.isLargeMessage() && supportLargeMessage) {
-            if (largeMessageDeliverer == null) {
-               // This can't really happen as handle had already crated the deliverer
-               // instead of throwing an exception in weird cases there is no problem on just go ahead and create it
-               // again here
-               largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage) message, reference);
-            }
-            // The deliverer was prepared during handle, as we can't have more than one pending large message
-            // as it would return busy if there is anything pending
-            largeMessageDeliverer.deliver();
-         } else {
-            deliverStandardMessage(reference, message);
-         }
-      } finally {
-         lockDelivery.readLock().unlock();
-         callback.afterDelivery();
-         if (server.hasBrokerMessagePlugins()) {
-            server.callBrokerMessagePlugins(plugin -> plugin.afterDeliver(this, reference));
+      if (message.isLargeMessage() && supportLargeMessage) {
+         if (largeMessageDeliverer == null) {
+            // This can't really happen as handle had already crated the deliverer
+            // instead of throwing an exception in weird cases there is no problem on just go ahead and create it
+            // again here
+            largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage) message, reference);
          }
+         // The deliverer was prepared during handle, as we can't have more than one pending large message
+         // as it would return busy if there is anything pending
+         largeMessageDeliverer.deliver();
+      } else {
+         deliverStandardMessage(reference, message);
       }
+   }
 
+   @Override
+   public void afterDeliver(MessageReference reference) throws Exception {
+      callback.afterDeliver(reference, reference.getMessage(), ServerConsumerImpl.this, reference.getDeliveryCount());
+      if (server.hasBrokerMessagePlugins()) {
+         server.callBrokerMessagePlugins(plugin -> plugin.afterDeliver(this, reference));
+      }
    }
 
    @Override
@@ -626,7 +632,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
     * there are no other messages to be delivered.
     */
    @Override
-   public void forceDelivery(final long sequence)  {
+   public void forceDelivery(final long sequence) {
       forceDelivery(sequence, () -> {
          Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50);
          MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue);
@@ -730,19 +736,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    @Override
    public void setStarted(final boolean started) {
       synchronized (lock) {
-         boolean locked = lockDelivery();
-
-         // This is to make sure nothing would sneak to the client while started = false
-         // the client will stop the session and perform a rollback in certain cases.
-         // in case something sneaks to the client you could get to messaging delivering forever until
-         // you restart the server
-         try {
-            this.started = browseOnly || started;
-         } finally {
-            if (locked) {
-               lockDelivery.writeLock().unlock();
-            }
-         }
+         this.started = browseOnly || started;
       }
 
       // Outside the lock
@@ -751,35 +745,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
       }
    }
 
-   private boolean lockDelivery() {
-      try {
-         if (!lockDelivery.writeLock().tryLock(30, TimeUnit.SECONDS)) {
-            ActiveMQServerLogger.LOGGER.timeoutLockingConsumer();
-            if (server != null) {
-               server.threadDump();
-            }
-            return false;
-         }
-         return true;
-      } catch (Exception e) {
-         ActiveMQServerLogger.LOGGER.failedToFinishDelivery(e);
-         return false;
-      }
-   }
-
    @Override
    public void setTransferring(final boolean transferring) {
       synchronized (lock) {
-         // This is to make sure that the delivery process has finished any pending delivery
-         // otherwise a message may sneak in on the client while we are trying to stop the consumer
-         boolean locked = lockDelivery();
-         try {
-            this.transferring = transferring;
-         } finally {
-            if (locked) {
-               lockDelivery.writeLock().unlock();
-            }
-         }
+         this.transferring = transferring;
       }
 
       // Outside the lock
@@ -1275,125 +1244,111 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
       }
 
       public boolean deliver() throws Exception {
-         lockDelivery.readLock().lock();
-         try {
-            if (!started) {
-               return false;
-            }
+         if (!started) {
+            return false;
+         }
 
-            LargeServerMessage currentLargeMessage = largeMessage;
-            if (currentLargeMessage == null) {
-               return true;
-            }
+         LargeServerMessage currentLargeMessage = largeMessage;
+         if (currentLargeMessage == null) {
+            return true;
+         }
 
-            if (availableCredits != null && availableCredits.get() <= 0) {
-               if (logger.isTraceEnabled()) {
-                  logger.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" +
-                                  availableCredits);
-               }
-               releaseHeapBodyBuffer();
-               return false;
+         if (availableCredits != null && availableCredits.get() <= 0) {
+            if (logger.isTraceEnabled()) {
+               logger.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" + availableCredits);
             }
+            releaseHeapBodyBuffer();
+            return false;
+         }
 
-            if (!sentInitialPacket) {
-               context = currentLargeMessage.getBodyEncoder();
+         if (!sentInitialPacket) {
+            context = currentLargeMessage.getBodyEncoder();
 
-               sizePendingLargeMessage = context.getLargeBodySize();
+            sizePendingLargeMessage = context.getLargeBodySize();
 
-               context.open();
+            context.open();
 
-               sentInitialPacket = true;
+            sentInitialPacket = true;
 
-               int packetSize = callback.sendLargeMessage(ref, currentLargeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount());
+            int packetSize = callback.sendLargeMessage(ref, currentLargeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount());
 
-               if (availableCredits != null) {
-                  final int credits = availableCredits.addAndGet(-packetSize);
+            if (availableCredits != null) {
+               final int credits = availableCredits.addAndGet(-packetSize);
 
-                  if (credits <= 0) {
-                     releaseHeapBodyBuffer();
-                  }
+               if (credits <= 0) {
+                  releaseHeapBodyBuffer();
+               }
 
-                  if (logger.isTraceEnabled()) {
-                     logger.trace(this + "::FlowControl::" +
-                                     " deliver initialpackage with " +
-                                     packetSize +
-                                     " delivered, available now = " +
-                                     availableCredits);
-                  }
+               if (logger.isTraceEnabled()) {
+                  logger.trace(this + "::FlowControl::" + " deliver initialpackage with " + packetSize + " delivered, available now = " + availableCredits);
                }
+            }
 
-               // Execute the rest of the large message on a different thread so as not to tie up the delivery thread
-               // for too long
+            // Execute the rest of the large message on a different thread so as not to tie up the delivery thread
+            // for too long
 
-               resumeLargeMessage();
+            resumeLargeMessage();
 
-               return false;
-            } else {
-               if (availableCredits != null && availableCredits.get() <= 0) {
-                  if (logger.isTraceEnabled()) {
-                     logger.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" +
-                                     availableCredits);
-                  }
-                  releaseHeapBodyBuffer();
-                  return false;
+            return false;
+         } else {
+            if (availableCredits != null && availableCredits.get() <= 0) {
+               if (logger.isTraceEnabled()) {
+                  logger.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" + availableCredits);
                }
+               releaseHeapBodyBuffer();
+               return false;
+            }
 
-               final int localChunkLen = (int) Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
+            final int localChunkLen = (int) Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
 
-               final ByteBuffer bodyBuffer = acquireHeapBodyBuffer(localChunkLen);
+            final ByteBuffer bodyBuffer = acquireHeapBodyBuffer(localChunkLen);
 
-               assert bodyBuffer.remaining() == localChunkLen;
+            assert bodyBuffer.remaining() == localChunkLen;
 
-               final int readBytes = context.encode(bodyBuffer);
+            final int readBytes = context.encode(bodyBuffer);
 
-               assert readBytes == localChunkLen;
+            assert readBytes == localChunkLen;
 
-               final byte[] body = bodyBuffer.array();
+            final byte[] body = bodyBuffer.array();
 
-               assert body.length == readBytes;
+            assert body.length == readBytes;
 
-               //It is possible to recycle the same heap body buffer because it won't be cached by sendLargeMessageContinuation
-               //given that requiresResponse is false: ChannelImpl::send will use the resend cache only if
-               //resendCache != null && packet.isRequiresConfirmations()
+            //It is possible to recycle the same heap body buffer because it won't be cached by sendLargeMessageContinuation
+            //given that requiresResponse is false: ChannelImpl::send will use the resend cache only if
+            //resendCache != null && packet.isRequiresConfirmations()
 
-               int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this, body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false);
+            int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this, body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false);
 
-               int chunkLen = body.length;
+            int chunkLen = body.length;
 
-               if (availableCredits != null) {
-                  final int credits = availableCredits.addAndGet(-packetSize);
+            if (availableCredits != null) {
+               final int credits = availableCredits.addAndGet(-packetSize);
 
-                  if (credits <= 0) {
-                     releaseHeapBodyBuffer();
-                  }
+               if (credits <= 0) {
+                  releaseHeapBodyBuffer();
+               }
 
-                  if (logger.isTraceEnabled()) {
-                     logger.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" +
-                                     packetSize +
-                                     " available now=" +
-                                     availableCredits);
-                  }
+               if (logger.isTraceEnabled()) {
+                  logger.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" + packetSize + " available now=" + availableCredits);
                }
+            }
 
-               positionPendingLargeMessage += chunkLen;
+            positionPendingLargeMessage += chunkLen;
 
-               if (positionPendingLargeMessage < sizePendingLargeMessage) {
-                  resumeLargeMessage();
+            if (positionPendingLargeMessage < sizePendingLargeMessage) {
+               resumeLargeMessage();
 
-                  return false;
-               }
+               return false;
             }
+         }
 
-            if (logger.isTraceEnabled()) {
-               logger.trace("Finished deliverLargeMessage");
-            }
+         if (logger.isTraceEnabled()) {
+            logger.trace("Finished deliverLargeMessage");
+         }
 
-            finish();
+         finish();
 
-            return true;
-         } finally {
-            lockDelivery.readLock().unlock();
-         }
+         return true;
       }
 
       public void finish() throws Exception {
@@ -1453,7 +1408,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
                }
 
                if (status == HandleStatus.HANDLED) {
-                  proceedDeliver(current);
+                  afterDeliver(current);
                }
 
                current = null;
@@ -1481,7 +1436,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
                }
 
                if (status == HandleStatus.HANDLED) {
-                  proceedDeliver(ref);
+                  afterDeliver(ref);
                } else if (status == HandleStatus.BUSY) {
                   // keep a reference on the current message reference
                   // to handle it next time the browser deliverer is executed
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index 5577522..18ef253 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -41,11 +41,10 @@ public interface SessionCallback {
     */
    boolean hasCredits(ServerConsumer consumerID);
 
-   /**
-    * This can be used to complete certain operations outside of the lock,
-    * like acks or other operations.
-    */
-   void afterDelivery() throws Exception;
+   // Certain protocols (MQTT) will need to confirm messages doing things such as individualACKS
+   // and these need to be done outside of the main lock.
+   // otherwise we could dead-lock during delivery
+   void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) throws Exception;
 
    /**
     * Use this to updates specifics on the message after a redelivery happened.
@@ -69,6 +68,7 @@ public interface SessionCallback {
    //       Future developments may change this, but beware why I have chosen to keep the parameter separated here
    int sendMessage(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount);
 
+
    int sendLargeMessage(MessageReference reference,
                         Message message,
                         ServerConsumer consumerID,
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
index 58bf2d3..9bc0ea2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
@@ -190,7 +190,7 @@ public class DummyServerConsumer implements ServerConsumer {
    }
 
    @Override
-   public void proceedDeliver(MessageReference reference) throws Exception {
+   public void afterDeliver(MessageReference reference) throws Exception {
 
    }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 3e64ac5..0006752 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -528,11 +528,6 @@ public class HangConsumerTest extends ActiveMQTestBase {
       }
 
       @Override
-      public void afterDelivery() throws Exception {
-
-      }
-
-      @Override
       public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
          targetCallback.sendProducerCreditsFailMessage(credits, address);
       }
@@ -558,6 +553,11 @@ public class HangConsumerTest extends ActiveMQTestBase {
          }
       }
 
+      @Override
+      public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
+
+      }
+
       /* (non-Javadoc)
        * @see SessionCallback#sendLargeMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, long, int)
        */
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java
index ba8cd95..fc32c44 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java
@@ -271,7 +271,6 @@ public class GroupingTest extends JMSTestBase {
 
          assertEquals(tm.getStringProperty("JMSXGroupID"), jmsxgroupID);
       }
-      Thread.sleep(2000);
       //session.rollback();
       //session.close();
       //consume all msgs from 2nd first consumer
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
index f20cce3..076e164 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
@@ -1312,12 +1312,21 @@ public class QueueImplTest extends ActiveMQTestBase {
 
          @Override
          public synchronized HandleStatus handle(MessageReference reference) {
+            return HandleStatus.HANDLED;
+         }
+
+         @Override
+         public Object handleWithGroup(GroupHandler groupHandler, boolean newGroup, MessageReference reference) throws Exception {
             if (count == 0) {
                //the first message is handled and will be used to determine this consumer
                //to be the group consumer
                count++;
                firstMessageHandled.countDown();
-               return HandleStatus.HANDLED;
+               if (groupHandler != null) {
+                  return groupHandler.handleMessageGroup(reference, this, newGroup);
+               } else {
+                  return HandleStatus.HANDLED;
+               }
             } else if (count <= 2) {
                //the next two attempts to send the second message will be done
                //attempting a direct delivery and an async one after that
@@ -1329,7 +1338,11 @@ public class QueueImplTest extends ActiveMQTestBase {
                //the second message should have stop the delivery loop:
                //it will succeed just to let the message being handled and
                //reduce the message count to 0
-               return HandleStatus.HANDLED;
+               if (groupHandler != null) {
+                  return groupHandler.handleMessageGroup(reference, this, newGroup);
+               } else {
+                  return HandleStatus.HANDLED;
+               }
             }
          }
       };
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
index 2a5a330..47b042b 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
@@ -138,7 +138,7 @@ public class FakeConsumer implements Consumer {
    }
 
    @Override
-   public void proceedDeliver(MessageReference ref) throws Exception {
+   public void afterDeliver(MessageReference ref) throws Exception {
       // no op
    }