You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2024/03/06 20:21:06 UTC

(activemq-artemis) branch main updated: ARTEMIS-4668 Moving AMQP Large Message file handling away from Netty thread

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new c83ed8957d ARTEMIS-4668 Moving AMQP Large Message file handling away from Netty thread
c83ed8957d is described below

commit c83ed8957d9d7f06bb29c0fce563fe2e3462993e
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Tue Mar 5 16:17:50 2024 -0500

    ARTEMIS-4668 Moving AMQP Large Message file handling away from Netty thread
---
 .../protocol/amqp/broker/AMQPSessionCallback.java  |  4 ++
 .../amqp/proton/AMQPConnectionContext.java         | 13 ++++
 .../amqp/proton/AMQPLargeMessageReader.java        | 78 ++++++++++++++--------
 .../amqp/proton/AMQPLargeMessageWriter.java        | 58 +++++++++++-----
 .../proton/AMQPTunneledCoreLargeMessageWriter.java |  2 +-
 .../protocol/amqp/proton/MessageReader.java        |  3 +
 .../protocol/amqp/proton/MessageWriter.java        |  2 +-
 .../amqp/proton/ProtonAbstractReceiver.java        | 65 +++++++++++++-----
 .../amqp/proton/ProtonServerSenderContext.java     |  2 +-
 .../amqp/proton/handler/ProtonHandler.java         | 18 +++--
 .../AMQPTunneledCoreLargeMessageWriterTest.java    |  7 +-
 .../proton/AMQPTunneledCoreMessageWriterTest.java  |  5 +-
 12 files changed, 180 insertions(+), 77 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index c8f081b30b..c67cac0a1d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -172,6 +172,10 @@ public class AMQPSessionCallback implements SessionCallback {
       }
    }
 
+   public void execute(Runnable run) {
+      sessionExecutor.execute(run);
+   }
+
    public void afterIO(IOCallback ioCallback) {
       OperationContext context = recoverContext();
       try {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index b84823ee2d..e1f6fe192c 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -94,6 +94,19 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
 
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+   public void disableAutoRead() {
+      handler.requireHandler();
+      connectionCallback.getTransportConnection().setAutoRead(false);
+      handler.setReadable(false);
+   }
+
+   public void enableAutoRead() {
+      handler.requireHandler();
+      connectionCallback.getTransportConnection().setAutoRead(true);
+      getHandler().setReadable(true);
+      flush();
+   }
+
    public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
    public static final String AMQP_CONTAINER_ID = "amqp-container-id";
    private static final FutureTask<Void> VOID_FUTURE = new FutureTask<>(() -> { }, null);
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java
index 6a44c6339f..63af7b1418 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java
@@ -34,7 +34,7 @@ public class AMQPLargeMessageReader implements MessageReader {
 
    private final ProtonAbstractReceiver serverReceiver;
 
-   private AMQPLargeMessage currentMessage;
+   private volatile AMQPLargeMessage currentMessage;
    private DeliveryAnnotations deliveryAnnotations;
    private boolean closed = true;
 
@@ -50,14 +50,15 @@ public class AMQPLargeMessageReader implements MessageReader {
    @Override
    public void close() {
       if (!closed) {
-         if (currentMessage != null) {
-            try {
-               currentMessage.deleteFile();
-            } catch (Throwable error) {
-               ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
-            } finally {
-               currentMessage = null;
+         try {
+            AMQPLargeMessage localCurrentMessage = currentMessage;
+            if (localCurrentMessage != null) {
+               localCurrentMessage.deleteFile();
             }
+         } catch (Throwable error) {
+            ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
+         } finally {
+            currentMessage = null;
          }
 
          deliveryAnnotations = null;
@@ -82,34 +83,53 @@ public class AMQPLargeMessageReader implements MessageReader {
          throw new IllegalStateException("AMQP Large Message Reader is closed and read cannot proceed");
       }
 
-      final Receiver receiver = ((Receiver) delivery.getLink());
-      final ReadableBuffer dataBuffer = receiver.recv();
+      try {
+         serverReceiver.connection.requireInHandler();
+
+         final Receiver receiver = ((Receiver) delivery.getLink());
+         final ReadableBuffer dataBuffer = receiver.recv();
 
-      if (currentMessage == null) {
          final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI();
-         final long id = sessionSPI.getStorageManager().generateID();
-         currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null,
-                                               sessionSPI.getCoreMessageObjectPools(),
-                                               sessionSPI.getStorageManager());
-         currentMessage.parseHeader(dataBuffer);
 
-         sessionSPI.getStorageManager().onLargeMessageCreate(id, currentMessage);
-      }
+         if (currentMessage == null) {
+            final long id = sessionSPI.getStorageManager().generateID();
+            AMQPLargeMessage localCurrentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), sessionSPI.getStorageManager());
+            localCurrentMessage.parseHeader(dataBuffer);
+
+            sessionSPI.getStorageManager().onLargeMessageCreate(id, localCurrentMessage);
+            currentMessage = localCurrentMessage;
+         }
+
+         serverReceiver.getConnection().disableAutoRead();
 
-      currentMessage.addBytes(dataBuffer);
+         boolean partial = delivery.isPartial();
 
-      final AMQPLargeMessage result;
+         sessionSPI.execute(() -> addBytes(delivery, dataBuffer, partial));
 
-      if (!delivery.isPartial()) {
-         currentMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true);
-         result = currentMessage;
-         // We don't want a close to delete the file now, we've released the resources.
-         currentMessage = null;
-         deliveryAnnotations = result.getDeliveryAnnotations();
-      } else {
-         result = null;
+         return null;
+      } catch (Exception e) {
+         // if an exception happened we must enable it back
+         serverReceiver.getConnection().enableAutoRead();
+         throw e;
       }
+   }
+
+   private void addBytes(Delivery delivery, ReadableBuffer dataBuffer, boolean isPartial) {
+      final AMQPLargeMessage localCurrentMessage = currentMessage;
+
+      try {
+         localCurrentMessage.addBytes(dataBuffer);
 
-      return result;
+         if (!isPartial) {
+            localCurrentMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true);
+            // We don't want a close to delete the file now, we've released the resources.
+            currentMessage = null;
+            serverReceiver.connection.runNow(() -> serverReceiver.onMessageComplete(delivery, localCurrentMessage, localCurrentMessage.getDeliveryAnnotations()));
+         }
+      } catch (Throwable e) {
+         serverReceiver.onExceptionWhileReading(e);
+      } finally {
+         serverReceiver.connection.runNow(serverReceiver.getConnection()::enableAutoRead);
+      }
    }
 }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java
index f6ef4ad20b..d6fccce451 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java
@@ -59,6 +59,9 @@ public class AMQPLargeMessageWriter implements MessageWriter {
 
    private MessageReference reference;
    private AMQPLargeMessage message;
+
+   private LargeBodyReader largeBodyReader;
+
    private Delivery delivery;
    private long position;
    private boolean initialPacketHandled;
@@ -81,33 +84,59 @@ public class AMQPLargeMessageWriter implements MessageWriter {
    public void close() {
       if (!closed) {
          try {
+            try {
+               if (largeBodyReader != null) {
+                  largeBodyReader.close();
+               }
+            } catch (Exception e) {
+               // if we get an error only at this point, there's nothing else we could do other than log.warn
+               logger.warn("{}", e.getMessage(), e);
+            }
             if (message != null) {
                message.usageDown();
             }
          } finally {
-            reset(true);
+            resetClosed();
          }
       }
    }
 
    @Override
-   public AMQPLargeMessageWriter open() {
+   public AMQPLargeMessageWriter open(MessageReference reference) {
       if (!closed) {
          throw new IllegalStateException("Trying to open an AMQP Large Message writer that was not closed");
       }
 
-      reset(false);
+      this.reference = reference;
+      this.message = (AMQPLargeMessage) reference.getMessage();
+      this.message.usageUp();
+
+      try {
+         largeBodyReader = message.getLargeBodyReader();
+         largeBodyReader.open();
+      } catch (Exception e) {
+         serverSender.reportDeliveryError(this, reference, e);
+      }
+
+      resetOpen();
 
       return this;
    }
 
-   private void reset(boolean closedState) {
+   private void resetClosed() {
       message = null;
       reference = null;
       delivery = null;
+      largeBodyReader = null;
       position = 0;
       initialPacketHandled = false;
-      closed = closedState;
+      closed = true;
+   }
+
+   private void resetOpen() {
+      position = 0;
+      initialPacketHandled = false;
+      closed = false;
    }
 
    @Override
@@ -121,17 +150,15 @@ public class AMQPLargeMessageWriter implements MessageWriter {
          throw new IllegalStateException("Cannot write to an AMQP Large Message Writer that has been closed");
       }
 
-      this.reference = messageReference;
-      this.message = (AMQPLargeMessage) messageReference.getMessage();
-
       if (sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) sessionSPI.getTransportConnection().getProtocolConnection()) != null) {
+         // an interceptor rejected the delivery
+         // since we opened the message as part of the queue executor we must close it now
+         close();
          return;
       }
 
       this.delivery = serverSender.createDelivery(messageReference, (int) this.message.getMessageFormat());
 
-      message.usageUp();
-
       tryDelivering();
    }
 
@@ -150,15 +177,14 @@ public class AMQPLargeMessageWriter implements MessageWriter {
          final ByteBuf frameBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(frameSize, frameSize);
          final NettyReadable frameView = new NettyReadable(frameBuffer);
 
-         try (LargeBodyReader context = message.getLargeBodyReader()) {
-            context.open();
-            context.position(position);
-            long bodySize = context.getSize();
+         try {
+            largeBodyReader.position(position);
+            long bodySize = largeBodyReader.getSize();
             // materialize it so we can use its internal NIO buffer
             frameBuffer.ensureWritable(frameSize);
 
             if (!initialPacketHandled && protonSender.getLocalState() != EndpointState.CLOSED) {
-               if (!deliverInitialPacket(context, frameBuffer)) {
+               if (!deliverInitialPacket(largeBodyReader, frameBuffer)) {
                   return;
                }
 
@@ -171,7 +197,7 @@ public class AMQPLargeMessageWriter implements MessageWriter {
                }
                frameBuffer.clear();
 
-               final int readSize = context.readInto(frameBuffer.internalNioBuffer(0, frameSize));
+               final int readSize = largeBodyReader.readInto(frameBuffer.internalNioBuffer(0, frameSize));
 
                frameBuffer.writerIndex(readSize);
 
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java
index 94b118db12..04a3ad2f1b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java
@@ -129,7 +129,7 @@ public class AMQPTunneledCoreLargeMessageWriter implements MessageWriter {
    }
 
    @Override
-   public AMQPTunneledCoreLargeMessageWriter open() {
+   public AMQPTunneledCoreLargeMessageWriter open(MessageReference reference) {
       if (state != State.CLOSED) {
          throw new IllegalStateException("Trying to open an AMQP Large Message writer that was not closed");
       }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/MessageReader.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/MessageReader.java
index 8b4db7a04e..020dc6d142 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/MessageReader.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/MessageReader.java
@@ -49,6 +49,9 @@ public interface MessageReader {
     * and is no longer partial the readBytes method will return the decoded message
     * for dispatch.
     *
+    * Notice that asynchronous Readers will never return the Message but will rather call a complete operation on the
+    * Server Receiver.
+    *
     * @param delivery
     *    The delivery that has pending incoming bytes.
     */
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/MessageWriter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/MessageWriter.java
index 8afac2c8b3..911ffcc590 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/MessageWriter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/MessageWriter.java
@@ -90,7 +90,7 @@ public interface MessageWriter extends Consumer<MessageReference> {
     * be called on every handler by the sender context as it doesn't know which instances need
     * opened.
     */
-   default MessageWriter open() {
+   default MessageWriter open(MessageReference reference) {
       // Default for stateless handlers is to do nothing here.
       return this;
    }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
index 62dc963634..9dbfe5406d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.proton;
 
+import java.lang.invoke.MethodHandles;
+
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
 import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -26,12 +28,17 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPExceptio
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
 import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class ProtonAbstractReceiver extends ProtonInitializable implements ProtonDeliveryHandler {
 
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
    protected final AMQPConnectionContext connection;
 
    protected final AMQPSessionContext protonSession;
@@ -302,8 +309,6 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme
    public final void onMessage(Delivery delivery) throws ActiveMQAMQPException {
       connection.requireInHandler();
 
-      final Receiver receiver = ((Receiver) delivery.getLink());
-
       if (receiver.current() != delivery) {
          return;
       }
@@ -320,28 +325,41 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme
             return;
          }
 
-         final Message message = messageReader.readBytes(delivery);
-
-         if (message != null) {
-            // Fetch this before the close of the reader as that will clear any read message
-            // delivery annotations.
-            final DeliveryAnnotations deliveryAnnotations = messageReader.getDeliveryAnnotations();
+         Message completeMessage;
+         if ((completeMessage = messageReader.readBytes(delivery)) != null) {
+            // notice the AMQP Large Message Reader will always return null
+            // and call the onMessageComplete directly
+            // since that happens asynchronously
+            onMessageComplete(delivery, completeMessage, messageReader.getDeliveryAnnotations());
+         }
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+         throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
+      }
+   }
 
-            this.messageReader.close();
-            this.messageReader = null;
+   public void onMessageComplete(Delivery delivery,
+                          Message message, DeliveryAnnotations deliveryAnnotations) {
+      connection.requireInHandler();
 
-            receiver.advance();
+      try {
+         receiver.advance();
 
-            Transaction tx = null;
-            if (delivery.getRemoteState() instanceof TransactionalState) {
-               TransactionalState txState = (TransactionalState) delivery.getRemoteState();
+         Transaction tx = null;
+         if (delivery.getRemoteState() instanceof TransactionalState) {
+            TransactionalState txState = (TransactionalState) delivery.getRemoteState();
+            try {
                tx = this.sessionSPI.getTransaction(txState.getTxnId(), false);
+            } catch (Exception e) {
+               this.onExceptionWhileReading(e);
             }
-
-            actualDelivery(message, delivery, deliveryAnnotations, receiver, tx);
          }
-      } catch (Exception e) {
-         throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
+
+         actualDelivery(message, delivery, deliveryAnnotations, receiver, tx);
+      } finally {
+         // reader is complete, we give it up now
+         this.messageReader.close();
+         this.messageReader = null;
       }
    }
 
@@ -351,6 +369,17 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme
       closeCurrentReader();
    }
 
+   public void onExceptionWhileReading(Throwable e) {
+      logger.warn(e.getMessage(), e);
+      connection.runNow(() -> {
+         // setting it enabled just in case a large message reader disabled it
+         connection.enableAutoRead();
+         ErrorCondition ec = new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage());
+         connection.close(ec);
+         connection.flush();
+      });
+   }
+
    @Override
    public void close(ErrorCondition condition) throws ActiveMQAMQPException {
       receiver.setCondition(condition);
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 9359f7fe94..ad0d42c20e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -483,7 +483,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          credits--;
       }
 
-      final MessageWriter messageWriter = controller.selectOutgoingMessageWriter(this, messageReference).open();
+      final MessageWriter messageWriter = controller.selectOutgoingMessageWriter(this, messageReference).open(messageReference);
 
       // Preserve for hasCredits to check for busy state and possible abort on close
       this.messageWriter = messageWriter;
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index 5bd3537ef5..17f17b0c70 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -95,6 +95,8 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
 
    boolean flushInstantly = false;
 
+   volatile boolean readable = true;
+
    /** afterFlush and afterFlushSet properties
     *  are set by afterFlush methods.
     *  This is to be called after the flush loop.
@@ -108,6 +110,15 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
    private Runnable afterFlush;
    protected Set<Runnable> afterFlushSet;
 
+   public boolean isReadable() {
+      return readable;
+   }
+
+   public ProtonHandler setReadable(boolean readable) {
+      this.readable = readable;
+      return this;
+   }
+
    @Override
    public void initialize() throws Exception {
       initialized = true;
@@ -381,11 +392,6 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
          flush();
       });
 
-      /*try {
-         Thread.sleep(1000);
-      } catch (Exception e) {
-         e.printStackTrace();
-      } */
       // this needs to be done in two steps
       // we first flush what we have to the client
       // after flushed, we close the local connection
@@ -562,7 +568,7 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
                AuditLogger.setRemoteAddress(h.getRemoteAddress());
             }
          }
-         while ((ev = collector.peek()) != null) {
+         while (isReadable() && (ev = collector.peek()) != null) {
             for (EventHandler h : handlers) {
                logger.trace("Handling {} towards {}", ev, h);
 
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java
index 44e85d358d..c686a59384 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java
@@ -55,6 +55,7 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.mockito.Spy;
 
@@ -150,7 +151,7 @@ public class AMQPTunneledCoreLargeMessageWriterTest {
 
       when(protonSender.getLocalState()).thenReturn(EndpointState.CLOSED);
 
-      writer.open();
+      writer.open(Mockito.mock(MessageReference.class));
 
       try {
          writer.writeBytes(reference);
@@ -177,7 +178,7 @@ public class AMQPTunneledCoreLargeMessageWriterTest {
    private void doTestMessageEncodingWrittenToDeliveryWithAnnotations(boolean deliveryAnnotations) throws Exception {
       AMQPTunneledCoreLargeMessageWriter writer = new AMQPTunneledCoreLargeMessageWriter(serverSender);
 
-      writer.open();
+      writer.open(Mockito.mock(MessageReference.class));
 
       final ByteBuf expectedEncoding = Unpooled.buffer();
 
@@ -276,7 +277,7 @@ public class AMQPTunneledCoreLargeMessageWriterTest {
    public void testLargeMessageUsageLoweredOnCloseWhenWriteNotCompleted() throws Exception {
       AMQPTunneledCoreLargeMessageWriter writer = new AMQPTunneledCoreLargeMessageWriter(serverSender);
 
-      writer.open();
+      writer.open(Mockito.mock(MessageReference.class));
 
       when(protonSender.getLocalState()).thenReturn(EndpointState.ACTIVE);
       when(protonDelivery.isPartial()).thenReturn(true);
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreMessageWriterTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreMessageWriterTest.java
index 9245fa095b..7a09c0911b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreMessageWriterTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreMessageWriterTest.java
@@ -52,6 +52,7 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.mockito.Spy;
 
@@ -110,7 +111,7 @@ public class AMQPTunneledCoreMessageWriterTest {
 
       when(protonSender.getLocalState()).thenReturn(EndpointState.CLOSED);
 
-      writer.open();
+      writer.open(Mockito.mock(MessageReference.class));
 
       try {
          writer.writeBytes(reference);
@@ -171,7 +172,7 @@ public class AMQPTunneledCoreMessageWriterTest {
          return null;
       }).when(message).persist(any(ActiveMQBuffer.class));
 
-      writer.open();
+      writer.open(Mockito.mock(MessageReference.class));
 
       try {
          writer.writeBytes(reference);