You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@activemq.apache.org by GitBox <gi...@apache.org> on 2021/09/01 09:22:12 UTC

[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #3711: ARTEMIS-3449 Speedup AMQP large message streaming

franz1981 commented on a change in pull request #3711:
URL: https://github.com/apache/activemq-artemis/pull/3711#discussion_r699475580



##########
File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
##########
@@ -587,52 +586,48 @@ void deliver() {
          LargeBodyReader context = message.getLargeBodyReader();
          try {
             context.open();
+            final ByteBuf tmpFrameBuf = PooledByteBufAllocator.DEFAULT.directBuffer(frameSize);
+            final NettyReadable nettyReadable = new NettyReadable(tmpFrameBuf);
             try {
+
                context.position(position);
                long bodySize = context.getSize();
-
-               ByteBuffer buf = ByteBuffer.allocate(frameSize);
+               // materialize it so we can use its internal NIO buffer
+               tmpFrameBuf.ensureWritable(frameSize);
 
                for (; sender.getLocalState() != EndpointState.CLOSED && position < bodySize; ) {
                   if (!connection.flowControl(this::resume)) {
                      context.close();
                      return;
                   }
-                  buf.clear();
-                  int size = 0;
-
-                  try {
-                     if (position == 0) {
-                        replaceInitialHeader(deliveryAnnotationsToEncode, context, WritableBuffer.ByteBufferWrapper.wrap(buf));
-                     }
-                     size = context.readInto(buf);
-
-                     sender.send(new ReadableBuffer.ByteBufferReader(buf));
-                     position += size;
-                  } catch (java.nio.BufferOverflowException overflowException) {
-                     if (position == 0) {
-                        if (log.isDebugEnabled()) {
-                           log.debug("Delivery of message failed with an overFlowException, retrying again with expandable buffer");
-                        }
-                        // on the very first packet, if the initial header was replaced with a much bigger header (re-encoding)
-                        // we could recover the situation with a retry using an expandable buffer.
-                        // this is tested on org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
-                        size = retryInitialPacketWithExpandableBuffer(deliveryAnnotationsToEncode, context, buf);
+                  tmpFrameBuf.clear();
+                  int headerSize = 0;
+                  int readLimit = frameSize;
+                  if (position == 0) {
+                     replaceInitialHeader(deliveryAnnotationsToEncode, context, new NettyWritable(tmpFrameBuf));
+                     headerSize = tmpFrameBuf.writerIndex();
+                     if (headerSize <= frameSize) {
+                        readLimit = frameSize - headerSize;

Review comment:
       >  The variable is also confusingly named since it is absolutely not the 'header size', its only non-header stuff than can ever get it into this situation.
   
   I can rework on naming to better capture what's happening here
   
   > why not just send the generated first payload at this point (as the old version did when the initial buffer wasnt big enough, in retryInitialPacketWithExpandableBuffer())
   
   I think it can be done, indeed, meaning that we will skip reading the file at this round, let me rework to do it, I like it
   
   
   

##########
File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
##########
@@ -146,16 +147,6 @@ private AMQPLargeMessage(final AMQPLargeMessage copy,
       setMessageID(newID);
    }
 
-   public void openLargeMessage() throws Exception {

Review comment:
       @clebertsuconic any thoughts here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org