You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by clebertsuconic <gi...@git.apache.org> on 2018/03/22 15:21:36 UTC

[GitHub] activemq-artemis pull request #1970: Artemis 1765 Large Mesasge fixes betwee...

GitHub user clebertsuconic opened a pull request:

    https://github.com/apache/activemq-artemis/pull/1970

    Artemis 1765  Large Mesasge fixes between protocols

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/clebertsuconic/activemq-artemis ARTEMIS-1765

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/activemq-artemis/pull/1970.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1970
    
----
commit 60d020c2dbf009656790787d89784288d1d156ae
Author: Howard Gao <ho...@...>
Date:   2018-03-22T04:24:39Z

    ARTEMIS-1765 Adding StompWithLargeMessageTest
    
    This closes #1965

commit ad8505eb6d78547395da42f27cc3054d721bc557
Author: Clebert Suconic <cl...@...>
Date:   2018-03-22T14:40:59Z

    ARTEMIS-1765 Fixing Large Message Compression and Conversion

----


---

[GitHub] activemq-artemis pull request #1970: Artemis 1765 Large Mesasge fixes betwee...

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1970#discussion_r176497842
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---
    @@ -213,6 +218,63 @@ public ActiveMQBuffer getReadOnlyBodyBuffer() {
           return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
        }
     
    +   /**
    +    * This will return the proper buffer to represent the data of the Message. If compressed it will decompress.
    +    * If large, it will read from the file or streaming.
    +    * @return
    +    * @throws ActiveMQException
    +    */
    +   @Override
    +   public ActiveMQBuffer getDataBuffer() {
    +
    +      ActiveMQBuffer buffer;
    +
    +      try {
    +         if (isLargeMessage()) {
    +            LargeBodyEncoder encoder = getBodyEncoder();
    --- End diff --
    
    would be better to pack the uncommon path into a separate method


---

[GitHub] activemq-artemis pull request #1970: Artemis 1765 Large Mesasge fixes betwee...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1970#discussion_r176503847
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---
    @@ -213,6 +218,63 @@ public ActiveMQBuffer getReadOnlyBodyBuffer() {
           return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
        }
     
    +   /**
    +    * This will return the proper buffer to represent the data of the Message. If compressed it will decompress.
    +    * If large, it will read from the file or streaming.
    +    * @return
    +    * @throws ActiveMQException
    +    */
    +   @Override
    +   public ActiveMQBuffer getDataBuffer() {
    +
    +      ActiveMQBuffer buffer;
    +
    +      try {
    +         if (isLargeMessage()) {
    +            LargeBodyEncoder encoder = getBodyEncoder();
    +            encoder.open();
    +            int bodySize = (int) encoder.getLargeBodySize();
    +
    +            buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
    +
    +            encoder.encode(buffer, bodySize);
    +            encoder.close();
    +         } else {
    +            buffer = getReadOnlyBodyBuffer();
    +         }
    +
    +         if (Boolean.TRUE.equals(getBooleanProperty(Message.HDR_LARGE_COMPRESSED))) {
    +            buffer = inflate(buffer);
    +         }
    +      } catch (Exception e) {
    +         ActiveMQIOErrorException e2 = new ActiveMQIOErrorException();
    +         e2.initCause(e);
    +         logger.warn(e.getMessage(), e);
    +         return getReadOnlyBodyBuffer();
    +      }
    +
    +      return buffer;
    +   }
    +
    +   private ActiveMQBuffer inflate(ActiveMQBuffer buffer) throws DataFormatException {
    +      int bytesToRead = buffer.readableBytes();
    +      Inflater inflater = new Inflater();
    --- End diff --
    
    I’m not storing inflated anywhere.  This is an edge case and I don’t want to create any major refactoring. 


---

[GitHub] activemq-artemis pull request #1970: Artemis 1765 Large Mesasge fixes betwee...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1970#discussion_r176528336
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---
    @@ -213,6 +218,63 @@ public ActiveMQBuffer getReadOnlyBodyBuffer() {
           return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
        }
     
    +   /**
    +    * This will return the proper buffer to represent the data of the Message. If compressed it will decompress.
    +    * If large, it will read from the file or streaming.
    +    * @return
    +    * @throws ActiveMQException
    +    */
    +   @Override
    +   public ActiveMQBuffer getDataBuffer() {
    +
    +      ActiveMQBuffer buffer;
    +
    +      try {
    +         if (isLargeMessage()) {
    +            LargeBodyEncoder encoder = getBodyEncoder();
    +            encoder.open();
    +            int bodySize = (int) encoder.getLargeBodySize();
    +
    +            buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
    +
    +            encoder.encode(buffer, bodySize);
    +            encoder.close();
    +         } else {
    +            buffer = getReadOnlyBodyBuffer();
    +         }
    +
    +         if (Boolean.TRUE.equals(getBooleanProperty(Message.HDR_LARGE_COMPRESSED))) {
    +            buffer = inflate(buffer);
    +         }
    +      } catch (Exception e) {
    +         ActiveMQIOErrorException e2 = new ActiveMQIOErrorException();
    --- End diff --
    
    ok.. fixed.. I had a mistake on this exception anyways.. I also broke the method you mentioned.. good catch


---

[GitHub] activemq-artemis pull request #1970: Artemis 1765 Large Mesasge fixes betwee...

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1970#discussion_r176497457
  
    --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java ---
    @@ -446,7 +449,15 @@ public void internalSimpleSend(int protocolSender, int protocolConsumer) throws
              TextMessage msg = session.createTextMessage("hello");
              msg.setIntProperty("mycount", 0);
              producer.send(msg);
    -         connection.close();
    +
    +         StringBuffer bufferLarge = new StringBuffer();
    --- End diff --
    
    Would be better to use `StringBuilder` instead, because `StringBuffer` is `synchronized`


---

[GitHub] activemq-artemis pull request #1970: Artemis 1765 Large Mesasge fixes betwee...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1970#discussion_r176503556
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---
    @@ -213,6 +218,63 @@ public ActiveMQBuffer getReadOnlyBodyBuffer() {
           return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
        }
     
    +   /**
    +    * This will return the proper buffer to represent the data of the Message. If compressed it will decompress.
    +    * If large, it will read from the file or streaming.
    +    * @return
    +    * @throws ActiveMQException
    +    */
    +   @Override
    +   public ActiveMQBuffer getDataBuffer() {
    +
    +      ActiveMQBuffer buffer;
    +
    +      try {
    +         if (isLargeMessage()) {
    +            LargeBodyEncoder encoder = getBodyEncoder();
    --- End diff --
    
    Compressed large message is already uncommon. 


---

[GitHub] activemq-artemis pull request #1970: Artemis 1765 Large Message fixes betwee...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/activemq-artemis/pull/1970


---

[GitHub] activemq-artemis pull request #1970: Artemis 1765 Large Mesasge fixes betwee...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1970#discussion_r176507140
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---
    @@ -213,6 +218,63 @@ public ActiveMQBuffer getReadOnlyBodyBuffer() {
           return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
        }
     
    +   /**
    +    * This will return the proper buffer to represent the data of the Message. If compressed it will decompress.
    +    * If large, it will read from the file or streaming.
    +    * @return
    +    * @throws ActiveMQException
    +    */
    +   @Override
    +   public ActiveMQBuffer getDataBuffer() {
    +
    +      ActiveMQBuffer buffer;
    +
    +      try {
    +         if (isLargeMessage()) {
    +            LargeBodyEncoder encoder = getBodyEncoder();
    +            encoder.open();
    +            int bodySize = (int) encoder.getLargeBodySize();
    +
    +            buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
    +
    +            encoder.encode(buffer, bodySize);
    +            encoder.close();
    +         } else {
    +            buffer = getReadOnlyBodyBuffer();
    +         }
    +
    +         if (Boolean.TRUE.equals(getBooleanProperty(Message.HDR_LARGE_COMPRESSED))) {
    +            buffer = inflate(buffer);
    +         }
    +      } catch (Exception e) {
    +         ActiveMQIOErrorException e2 = new ActiveMQIOErrorException();
    +         e2.initCause(e);
    +         logger.warn(e.getMessage(), e);
    +         return getReadOnlyBodyBuffer();
    +      }
    +
    +      return buffer;
    +   }
    +
    +   private ActiveMQBuffer inflate(ActiveMQBuffer buffer) throws DataFormatException {
    +      int bytesToRead = buffer.readableBytes();
    +      Inflater inflater = new Inflater();
    +      inflater.setInput(ByteUtil.getActiveArray(buffer.readBytes(bytesToRead).toByteBuffer()));
    --- End diff --
    
    It is.  Unless you want to refactor it and implement proper inflated over the buffer for an edge case. 
    
    
    This is just bringing code that is on stomp only to other protocols. 


---

[GitHub] activemq-artemis pull request #1970: Artemis 1765 Large Mesasge fixes betwee...

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1970#discussion_r176494666
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---
    @@ -213,6 +218,63 @@ public ActiveMQBuffer getReadOnlyBodyBuffer() {
           return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
        }
     
    +   /**
    +    * This will return the proper buffer to represent the data of the Message. If compressed it will decompress.
    +    * If large, it will read from the file or streaming.
    +    * @return
    +    * @throws ActiveMQException
    +    */
    +   @Override
    +   public ActiveMQBuffer getDataBuffer() {
    +
    +      ActiveMQBuffer buffer;
    +
    +      try {
    +         if (isLargeMessage()) {
    +            LargeBodyEncoder encoder = getBodyEncoder();
    +            encoder.open();
    +            int bodySize = (int) encoder.getLargeBodySize();
    +
    +            buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
    +
    +            encoder.encode(buffer, bodySize);
    +            encoder.close();
    +         } else {
    +            buffer = getReadOnlyBodyBuffer();
    +         }
    +
    +         if (Boolean.TRUE.equals(getBooleanProperty(Message.HDR_LARGE_COMPRESSED))) {
    +            buffer = inflate(buffer);
    +         }
    +      } catch (Exception e) {
    +         ActiveMQIOErrorException e2 = new ActiveMQIOErrorException();
    +         e2.initCause(e);
    +         logger.warn(e.getMessage(), e);
    +         return getReadOnlyBodyBuffer();
    +      }
    +
    +      return buffer;
    +   }
    +
    +   private ActiveMQBuffer inflate(ActiveMQBuffer buffer) throws DataFormatException {
    +      int bytesToRead = buffer.readableBytes();
    +      Inflater inflater = new Inflater();
    --- End diff --
    
    The `Inflater` should be part of the parameter in order to allow future API usage allowing it to be pooled (it is not a lightweight object)


---

[GitHub] activemq-artemis pull request #1970: Artemis 1765 Large Mesasge fixes betwee...

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1970#discussion_r176498277
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---
    @@ -213,6 +218,63 @@ public ActiveMQBuffer getReadOnlyBodyBuffer() {
           return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
        }
     
    +   /**
    +    * This will return the proper buffer to represent the data of the Message. If compressed it will decompress.
    +    * If large, it will read from the file or streaming.
    +    * @return
    +    * @throws ActiveMQException
    +    */
    +   @Override
    +   public ActiveMQBuffer getDataBuffer() {
    +
    +      ActiveMQBuffer buffer;
    +
    +      try {
    +         if (isLargeMessage()) {
    +            LargeBodyEncoder encoder = getBodyEncoder();
    +            encoder.open();
    +            int bodySize = (int) encoder.getLargeBodySize();
    +
    +            buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
    +
    +            encoder.encode(buffer, bodySize);
    +            encoder.close();
    +         } else {
    +            buffer = getReadOnlyBodyBuffer();
    +         }
    +
    +         if (Boolean.TRUE.equals(getBooleanProperty(Message.HDR_LARGE_COMPRESSED))) {
    +            buffer = inflate(buffer);
    +         }
    +      } catch (Exception e) {
    +         ActiveMQIOErrorException e2 = new ActiveMQIOErrorException();
    --- End diff --
    
    the exception could be packed in separate method too, because is the uncommon path


---

[GitHub] activemq-artemis pull request #1970: Artemis 1765 Large Mesasge fixes betwee...

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1970#discussion_r176509907
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---
    @@ -213,6 +218,63 @@ public ActiveMQBuffer getReadOnlyBodyBuffer() {
           return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
        }
     
    +   /**
    +    * This will return the proper buffer to represent the data of the Message. If compressed it will decompress.
    +    * If large, it will read from the file or streaming.
    +    * @return
    +    * @throws ActiveMQException
    +    */
    +   @Override
    +   public ActiveMQBuffer getDataBuffer() {
    +
    +      ActiveMQBuffer buffer;
    +
    +      try {
    +         if (isLargeMessage()) {
    +            LargeBodyEncoder encoder = getBodyEncoder();
    --- End diff --
    
    I know, but `getDataBuffer` is used on OpenWire hot path and could be made smaller to help the JVM.
    To help in these case would be good to create:
    ```
    private ActiveMQBuffer getLargeMessageDataBuffer(){
        //the code for the large message case
    }
    ```
    and let the caller method became smaller thanks to this refactoring. wdyt?


---

[GitHub] activemq-artemis pull request #1970: Artemis 1765 Large Mesasge fixes betwee...

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1970#discussion_r176496551
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---
    @@ -213,6 +218,63 @@ public ActiveMQBuffer getReadOnlyBodyBuffer() {
           return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
        }
     
    +   /**
    +    * This will return the proper buffer to represent the data of the Message. If compressed it will decompress.
    +    * If large, it will read from the file or streaming.
    +    * @return
    +    * @throws ActiveMQException
    +    */
    +   @Override
    +   public ActiveMQBuffer getDataBuffer() {
    +
    +      ActiveMQBuffer buffer;
    +
    +      try {
    +         if (isLargeMessage()) {
    +            LargeBodyEncoder encoder = getBodyEncoder();
    +            encoder.open();
    +            int bodySize = (int) encoder.getLargeBodySize();
    +
    +            buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
    +
    +            encoder.encode(buffer, bodySize);
    +            encoder.close();
    +         } else {
    +            buffer = getReadOnlyBodyBuffer();
    +         }
    +
    +         if (Boolean.TRUE.equals(getBooleanProperty(Message.HDR_LARGE_COMPRESSED))) {
    +            buffer = inflate(buffer);
    +         }
    +      } catch (Exception e) {
    +         ActiveMQIOErrorException e2 = new ActiveMQIOErrorException();
    +         e2.initCause(e);
    +         logger.warn(e.getMessage(), e);
    +         return getReadOnlyBodyBuffer();
    +      }
    +
    +      return buffer;
    +   }
    +
    +   private ActiveMQBuffer inflate(ActiveMQBuffer buffer) throws DataFormatException {
    +      int bytesToRead = buffer.readableBytes();
    +      Inflater inflater = new Inflater();
    +      inflater.setInput(ByteUtil.getActiveArray(buffer.readBytes(bytesToRead).toByteBuffer()));
    --- End diff --
    
    Is necessary to perform such many copies of the data?
    Both `setInput` and `inflate` have range based versions `*(byte[] b, int off, int len)` and that could be used to reduce the number of copies.


---