You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@activemq.apache.org by GitBox <gi...@apache.org> on 2021/02/04 20:35:54 UTC

[GitHub] [activemq-artemis] clebertsuconic commented on a change in pull request #3334: ARTEMIS-2984 Compressed large messages can leak native resources

clebertsuconic commented on a change in pull request #3334:
URL: https://github.com/apache/activemq-artemis/pull/3334#discussion_r570526133



##########
File path: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
##########
@@ -410,108 +410,117 @@ private void largeMessageSendBuffered(final boolean sendBlocking,
    /**
     * @param sendBlocking
     * @param msgI
-    * @param inputStreamParameter
+    * @param inputStream
     * @param credits
     * @throws ActiveMQException
     */
    private void largeMessageSendStreamed(final boolean sendBlocking,
                                          final ICoreMessage msgI,
-                                         final InputStream inputStreamParameter,
+                                         final InputStream inputStream,
                                          final ClientProducerCredits credits,
                                          SendAcknowledgementHandler handler) throws ActiveMQException {
-      boolean lastPacket = false;
+      final DeflaterReader deflaterReader = session.isCompressLargeMessages() ? new DeflaterReader(inputStream) : null;
 
-      InputStream input = inputStreamParameter;
+      try (InputStream input = session.isCompressLargeMessages() ? deflaterReader : inputStream) {
 
-      // We won't know the real size of the message since we are compressing while reading the streaming.
-      // This counter will be passed to the deflater to be updated for every byte read
-      AtomicLong messageSize = new AtomicLong();
-
-      DeflaterReader deflaterReader = null;
-
-      if (session.isCompressLargeMessages()) {
-         msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, true);
-         deflaterReader = new DeflaterReader(inputStreamParameter, messageSize);
-         input = deflaterReader;
-      }
-
-      long totalSize = 0;
-
-      boolean headerSent = false;
-
-      int reconnectID = sessionContext.getReconnectID();
-      while (!lastPacket) {
-         byte[] buff = new byte[minLargeMessageSize];
-
-         int pos = 0;
-
-         do {
-            int numberOfBytesRead;
-
-            int wanted = minLargeMessageSize - pos;
-
-            try {
-               numberOfBytesRead = input.read(buff, pos, wanted);
-            } catch (IOException e) {
-               throw ActiveMQClientMessageBundle.BUNDLE.errorReadingBody(e);
-            }
-
-            if (numberOfBytesRead == -1) {
-               lastPacket = true;
-
-               break;
-            }
-
-            pos += numberOfBytesRead;
+         if (session.isCompressLargeMessages()) {
+            msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, true);
          }
-         while (pos < minLargeMessageSize);
 
-         totalSize += pos;
+         long totalBytesRead = 0;
 
-         if (lastPacket) {
-            if (!session.isCompressLargeMessages()) {
-               messageSize.set(totalSize);
-            }
+         boolean headerSent = false;
+
+         int reconnectID = sessionContext.getReconnectID();
+         while (true) {
+            final byte[] scratchBuffer = new byte[minLargeMessageSize];
 
-            // This is replacing the last packet by a smaller packet
-            byte[] buff2 = new byte[pos];
+            final int result = readLargeMessageChunk(input, scratchBuffer, minLargeMessageSize);
 
-            System.arraycopy(buff, 0, buff2, 0, pos);
+            final int bytesRead = bytesRead(result);
 
-            buff = buff2;
+            totalBytesRead += bytesRead;
 
-            // This is the case where the message is being converted as a regular message
-            if (!headerSent && session.isCompressLargeMessages() && buff2.length < minLargeMessageSize) {
-               msgI.getBodyBuffer().resetReaderIndex();
-               msgI.getBodyBuffer().resetWriterIndex();
-               msgI.putLongProperty(Message.HDR_LARGE_BODY_SIZE, deflaterReader.getTotalSize());
+            final boolean lastPacket = isLargeMessageLastPacket(result);
 
-               msgI.getBodyBuffer().writeBytes(buff, 0, pos);
-               sendRegularMessage(msgI.getAddressSimpleString(), msgI, sendBlocking, credits, handler);
-               return;
-            } else {
+            if (lastPacket) {
+               // This is the case where the message is being converted as a regular message
+               if (!headerSent && session.isCompressLargeMessages() && bytesRead < minLargeMessageSize) {
+                  assert bytesRead == totalBytesRead;
+                  msgI.getBodyBuffer().resetReaderIndex();
+                  msgI.getBodyBuffer().resetWriterIndex();
+                  msgI.putLongProperty(Message.HDR_LARGE_BODY_SIZE, deflaterReader.getTotalSize());
+                  msgI.getBodyBuffer().writeBytes(scratchBuffer, 0, bytesRead);
+                  sendRegularMessage(msgI.getAddressSimpleString(), msgI, sendBlocking, credits, handler);
+                  break;
+               }
                if (!headerSent) {
-                  headerSent = true;
                   sendInitialLargeMessageHeader(msgI, credits);
                }
-               int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, true, buff, reconnectID, handler);
+               final long messageBodySize = deflaterReader != null ? deflaterReader.getTotalSize() : totalBytesRead;
+               int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageBodySize, sendBlocking, true, trimmedBuffer(scratchBuffer, bytesRead), reconnectID, handler);
                credits.acquireCredits(creditsSent);
+               break;
             }
-         } else {
+            // !lastPacket
             if (!headerSent) {
                headerSent = true;
                sendInitialLargeMessageHeader(msgI, credits);
             }
 
-            int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, false, buff, reconnectID, handler);
+            int creditsSent = sessionContext.sendLargeMessageChunk(msgI, 0, sendBlocking, false, trimmedBuffer(scratchBuffer, bytesRead), reconnectID, handler);
             credits.acquireCredits(creditsSent);
          }
-      }
-
-      try {
-         input.close();
       } catch (IOException e) {
          throw ActiveMQClientMessageBundle.BUNDLE.errorClosingLargeMessage(e);
       }
    }
+
+   /**
+    * This is trimming {@code buffer} to the expected size or reusing it if not needed.
+    */
+   private static byte[] trimmedBuffer(byte[] buffer, int expectedSize) {
+      if (buffer.length == expectedSize) {
+         return buffer;
+      }
+      byte[] trimmedBuffer = new byte[expectedSize];
+      System.arraycopy(buffer, 0, trimmedBuffer, 0, expectedSize);
+      return trimmedBuffer;
+   }
+
+   private static boolean isLargeMessageLastPacket(int readResult) {
+      return readResult <= 0;
+   }
+
+   private static int bytesRead(int readResult) {
+      return readResult > 0 ? readResult : -readResult;
+   }
+
+   /**
+    * Use {@link #isLargeMessageLastPacket(int)} and {@link #bytesRead(int)} to decode the result of this method.
+    */
+   private static int readLargeMessageChunk(InputStream inputStream,
+                                            byte[] readBuffer,
+                                            int chunkLimit) throws ActiveMQLargeMessageException {
+      assert chunkLimit > 0;
+      int bytesRead = 0;
+      do {
+         final int remaining = chunkLimit - bytesRead;
+         final int read;
+         try {
+            read = inputStream.read(readBuffer, bytesRead, remaining);
+         } catch (IOException e) {
+            throw ActiveMQClientMessageBundle.BUNDLE.errorReadingBody(e);
+         }
+         if (read == -1) {
+            // bytesRead can be 0 if the stream return -1 after 0-length reads
+            return -bytesRead;
+         }
+         bytesRead += read;
+      }
+      while (bytesRead < chunkLimit);
+      assert bytesRead == chunkLimit;

Review comment:
       is this assert correct? what if you are reading a large message that Is not a multiple of chunkLimit?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org