You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/11 20:55:24 UTC

activemq-artemis git commit: ARTEMIS-2068 save reading any file to get AMQP large msg size

Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x 94a230005 -> 74317ef9c


ARTEMIS-2068 save reading any file to get AMQP large msg size

ServerJMSBytesMessage::getBodyLength can save reading
the whole large message file by reading just its
file size

(cherry picked from commit 3c7252adbca1d497e0c89dd8247f0b2f20e2425e)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/74317ef9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/74317ef9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/74317ef9

Branch: refs/heads/2.6.x
Commit: 74317ef9c433d580396a7ddc32fd7cd9c5676df5
Parents: 94a2300
Author: Francesco Nigro <ni...@gmail.com>
Authored: Mon Sep 3 08:57:11 2018 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 11 16:55:18 2018 -0400

----------------------------------------------------------------------
 .../activemq/artemis/api/core/ICoreMessage.java |  5 ++++
 .../artemis/core/message/impl/CoreMessage.java  |  7 ++++++
 .../artemis/message/CoreMessageTest.java        |  8 ++++++
 .../converter/jms/ServerJMSBytesMessage.java    |  2 +-
 .../impl/journal/LargeServerMessageImpl.java    | 25 +++++++++++++++++++
 .../integration/client/LargeMessageTest.java    | 26 ++++++++++++++++++++
 6 files changed, 72 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/74317ef9/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
index f0eb1b6..66c4cdf 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
@@ -42,6 +42,11 @@ public interface ICoreMessage extends Message {
    ActiveMQBuffer getReadOnlyBodyBuffer();
 
    /**
+    * Returns the length in bytes of the body buffer.
+    */
+   int getBodyBufferSize();
+
+   /**
     * Returns a readOnlyBodyBuffer or a decompressed one if the message is compressed.
     * or the large message buffer.
     * @return

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/74317ef9/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 323d9f4..5272200 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -217,6 +217,13 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
       return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
    }
 
+   @Override
+   public int getBodyBufferSize() {
+      checkEncode();
+      internalWritableBuffer();
+      return endOfBodyPosition - BUFFER_HEADER_SPACE;
+   }
+
    /**
     * This will return the proper buffer to represent the data of the Message. If compressed it will decompress.
     * If large, it will read from the file or streaming.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/74317ef9/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
index 310b4ed..5ba2a5b 100644
--- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
@@ -82,6 +82,14 @@ public class CoreMessageTest {
       Assert.assertEquals(TEXT, TextMessageUtil.readBodyText(decodedMessage.getReadOnlyBodyBuffer()).toString());
    }
 
+   @Test
+   public void testBodyBufferSize() {
+      final CoreMessage decodedMessage = decodeMessage();
+      final int bodyBufferSize = decodedMessage.getBodyBufferSize();
+      final int readonlyBodyBufferReadableBytes = decodedMessage.getReadOnlyBodyBuffer().readableBytes();
+      Assert.assertEquals(bodyBufferSize, readonlyBodyBufferReadableBytes);
+   }
+
    /** The message is received, then sent to the other side untouched */
    @Test
    public void sendThroughPackets() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/74317ef9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java
index a94cfde..f7f2a0d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java
@@ -54,7 +54,7 @@ public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMess
 
    @Override
    public long getBodyLength() throws JMSException {
-      return message.getReadOnlyBodyBuffer().readableBytes();
+      return message.getBodyBufferSize();
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/74317ef9/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index d940946..257141e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -224,6 +224,31 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
    }
 
    @Override
+   public int getBodyBufferSize() {
+      final boolean closeFile = file == null || !file.isOpen();
+      try {
+         openFile();
+         final long fileSize = file.size();
+         int fileSizeAsInt = (int) fileSize;
+         if (fileSizeAsInt < 0) {
+            logger.warnf("suspicious large message file size of %d bytes for %s, will use %d instead.",
+                         fileSize, file.getFileName(), Integer.MAX_VALUE);
+            fileSizeAsInt = Integer.MAX_VALUE;
+         }
+         return fileSizeAsInt;
+      } catch (Exception e) {
+         throw new RuntimeException(e);
+      } finally {
+         if (closeFile) {
+            try {
+               file.close();
+            } catch (Exception ignored) {
+            }
+         }
+      }
+   }
+
+   @Override
    public boolean isLargeMessage() {
       return true;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/74317ef9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
index 025d00a..1d9075d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
@@ -2289,6 +2289,32 @@ public class LargeMessageTest extends LargeMessageTestBase {
       log.debug("Thread done");
    }
 
+   @Test
+   public void testLargeMessageBodySize() throws Exception {
+      ActiveMQServer server = createServer(true, isNetty(), storeType);
+
+      server.start();
+
+      LargeServerMessageImpl fileMessage = new LargeServerMessageImpl((JournalStorageManager) server.getStorageManager());
+
+      fileMessage.setMessageID(1005);
+
+      Assert.assertEquals(0, fileMessage.getBodyBufferSize());
+
+      for (int i = 0; i < largeMessageSize; i++) {
+         fileMessage.addBytes(new byte[]{ActiveMQTestBase.getSamplebyte(i)});
+      }
+
+      Assert.assertEquals(largeMessageSize, fileMessage.getBodyBufferSize());
+
+      // The server would be doing this
+      fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, largeMessageSize);
+
+      fileMessage.releaseResources();
+
+      Assert.assertEquals(largeMessageSize, fileMessage.getBodyBufferSize());
+   }
+
    // The ClientConsumer should be able to also send ServerLargeMessages as that's done by the CoreBridge
    @Test
    public void testSendServerMessage() throws Exception {