You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by clebertsuconic <gi...@git.apache.org> on 2018/03/22 15:21:36 UTC
[GitHub] activemq-artemis pull request #1970: Artemis 1765 Large Mesasge fixes betwee...
GitHub user clebertsuconic opened a pull request:
https://github.com/apache/activemq-artemis/pull/1970
Artemis 1765 Large Mesasge fixes between protocols
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/clebertsuconic/activemq-artemis ARTEMIS-1765
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/activemq-artemis/pull/1970.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1970
----
commit 60d020c2dbf009656790787d89784288d1d156ae
Author: Howard Gao <ho...@...>
Date: 2018-03-22T04:24:39Z
ARTEMIS-1765 Adding StompWithLargeMessageTest
This closes #1965
commit ad8505eb6d78547395da42f27cc3054d721bc557
Author: Clebert Suconic <cl...@...>
Date: 2018-03-22T14:40:59Z
ARTEMIS-1765 Fixing Large Message Compression and Conversion
----
---
[GitHub] activemq-artemis pull request #1970: Artemis 1765 Large Mesasge fixes betwee...
Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/1970#discussion_r176497842
--- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---
@@ -213,6 +218,63 @@ public ActiveMQBuffer getReadOnlyBodyBuffer() {
return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
}
+ /**
+ * This will return the proper buffer to represent the data of the Message. If compressed it will decompress.
+ * If large, it will read from the file or streaming.
+ * @return
+ * @throws ActiveMQException
+ */
+ @Override
+ public ActiveMQBuffer getDataBuffer() {
+
+ ActiveMQBuffer buffer;
+
+ try {
+ if (isLargeMessage()) {
+ LargeBodyEncoder encoder = getBodyEncoder();
--- End diff --
would be better to pack the uncommon path into a separate method
---
[GitHub] activemq-artemis pull request #1970: Artemis 1765 Large Mesasge fixes betwee...
Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/1970#discussion_r176503847
--- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---
@@ -213,6 +218,63 @@ public ActiveMQBuffer getReadOnlyBodyBuffer() {
return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
}
+ /**
+ * This will return the proper buffer to represent the data of the Message. If compressed it will decompress.
+ * If large, it will read from the file or streaming.
+ * @return
+ * @throws ActiveMQException
+ */
+ @Override
+ public ActiveMQBuffer getDataBuffer() {
+
+ ActiveMQBuffer buffer;
+
+ try {
+ if (isLargeMessage()) {
+ LargeBodyEncoder encoder = getBodyEncoder();
+ encoder.open();
+ int bodySize = (int) encoder.getLargeBodySize();
+
+ buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
+
+ encoder.encode(buffer, bodySize);
+ encoder.close();
+ } else {
+ buffer = getReadOnlyBodyBuffer();
+ }
+
+ if (Boolean.TRUE.equals(getBooleanProperty(Message.HDR_LARGE_COMPRESSED))) {
+ buffer = inflate(buffer);
+ }
+ } catch (Exception e) {
+ ActiveMQIOErrorException e2 = new ActiveMQIOErrorException();
+ e2.initCause(e);
+ logger.warn(e.getMessage(), e);
+ return getReadOnlyBodyBuffer();
+ }
+
+ return buffer;
+ }
+
+ private ActiveMQBuffer inflate(ActiveMQBuffer buffer) throws DataFormatException {
+ int bytesToRead = buffer.readableBytes();
+ Inflater inflater = new Inflater();
--- End diff --
I’m not storing inflated anywhere. This is an edge case and I don’t want to create any major refactoring.
---
[GitHub] activemq-artemis pull request #1970: Artemis 1765 Large Mesasge fixes betwee...
Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/1970#discussion_r176528336
--- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---
@@ -213,6 +218,63 @@ public ActiveMQBuffer getReadOnlyBodyBuffer() {
return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
}
+ /**
+ * This will return the proper buffer to represent the data of the Message. If compressed it will decompress.
+ * If large, it will read from the file or streaming.
+ * @return
+ * @throws ActiveMQException
+ */
+ @Override
+ public ActiveMQBuffer getDataBuffer() {
+
+ ActiveMQBuffer buffer;
+
+ try {
+ if (isLargeMessage()) {
+ LargeBodyEncoder encoder = getBodyEncoder();
+ encoder.open();
+ int bodySize = (int) encoder.getLargeBodySize();
+
+ buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
+
+ encoder.encode(buffer, bodySize);
+ encoder.close();
+ } else {
+ buffer = getReadOnlyBodyBuffer();
+ }
+
+ if (Boolean.TRUE.equals(getBooleanProperty(Message.HDR_LARGE_COMPRESSED))) {
+ buffer = inflate(buffer);
+ }
+ } catch (Exception e) {
+ ActiveMQIOErrorException e2 = new ActiveMQIOErrorException();
--- End diff --
ok.. fixed.. I had a mistake on this exception anyways.. I also broke the method you mentioned.. good catch
---
[GitHub] activemq-artemis pull request #1970: Artemis 1765 Large Mesasge fixes betwee...
Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/1970#discussion_r176497457
--- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java ---
@@ -446,7 +449,15 @@ public void internalSimpleSend(int protocolSender, int protocolConsumer) throws
TextMessage msg = session.createTextMessage("hello");
msg.setIntProperty("mycount", 0);
producer.send(msg);
- connection.close();
+
+ StringBuffer bufferLarge = new StringBuffer();
--- End diff --
Would be better to use `StringBuilder` instead, because `StringBuffer` is `synchronized`
---
[GitHub] activemq-artemis pull request #1970: Artemis 1765 Large Mesasge fixes betwee...
Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/1970#discussion_r176503556
--- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---
@@ -213,6 +218,63 @@ public ActiveMQBuffer getReadOnlyBodyBuffer() {
return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
}
+ /**
+ * This will return the proper buffer to represent the data of the Message. If compressed it will decompress.
+ * If large, it will read from the file or streaming.
+ * @return
+ * @throws ActiveMQException
+ */
+ @Override
+ public ActiveMQBuffer getDataBuffer() {
+
+ ActiveMQBuffer buffer;
+
+ try {
+ if (isLargeMessage()) {
+ LargeBodyEncoder encoder = getBodyEncoder();
--- End diff --
Compressed large message is already uncommon.
---
[GitHub] activemq-artemis pull request #1970: Artemis 1765 Large Message fixes betwee...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/activemq-artemis/pull/1970
---
[GitHub] activemq-artemis pull request #1970: Artemis 1765 Large Mesasge fixes betwee...
Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/1970#discussion_r176507140
--- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---
@@ -213,6 +218,63 @@ public ActiveMQBuffer getReadOnlyBodyBuffer() {
return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
}
+ /**
+ * This will return the proper buffer to represent the data of the Message. If compressed it will decompress.
+ * If large, it will read from the file or streaming.
+ * @return
+ * @throws ActiveMQException
+ */
+ @Override
+ public ActiveMQBuffer getDataBuffer() {
+
+ ActiveMQBuffer buffer;
+
+ try {
+ if (isLargeMessage()) {
+ LargeBodyEncoder encoder = getBodyEncoder();
+ encoder.open();
+ int bodySize = (int) encoder.getLargeBodySize();
+
+ buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
+
+ encoder.encode(buffer, bodySize);
+ encoder.close();
+ } else {
+ buffer = getReadOnlyBodyBuffer();
+ }
+
+ if (Boolean.TRUE.equals(getBooleanProperty(Message.HDR_LARGE_COMPRESSED))) {
+ buffer = inflate(buffer);
+ }
+ } catch (Exception e) {
+ ActiveMQIOErrorException e2 = new ActiveMQIOErrorException();
+ e2.initCause(e);
+ logger.warn(e.getMessage(), e);
+ return getReadOnlyBodyBuffer();
+ }
+
+ return buffer;
+ }
+
+ private ActiveMQBuffer inflate(ActiveMQBuffer buffer) throws DataFormatException {
+ int bytesToRead = buffer.readableBytes();
+ Inflater inflater = new Inflater();
+ inflater.setInput(ByteUtil.getActiveArray(buffer.readBytes(bytesToRead).toByteBuffer()));
--- End diff --
It is. Unless you want to refactor it and implement proper inflated over the buffer for an edge case.
This is just bringing code that is on stomp only to other protocols.
---
[GitHub] activemq-artemis pull request #1970: Artemis 1765 Large Mesasge fixes betwee...
Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/1970#discussion_r176494666
--- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---
@@ -213,6 +218,63 @@ public ActiveMQBuffer getReadOnlyBodyBuffer() {
return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
}
+ /**
+ * This will return the proper buffer to represent the data of the Message. If compressed it will decompress.
+ * If large, it will read from the file or streaming.
+ * @return
+ * @throws ActiveMQException
+ */
+ @Override
+ public ActiveMQBuffer getDataBuffer() {
+
+ ActiveMQBuffer buffer;
+
+ try {
+ if (isLargeMessage()) {
+ LargeBodyEncoder encoder = getBodyEncoder();
+ encoder.open();
+ int bodySize = (int) encoder.getLargeBodySize();
+
+ buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
+
+ encoder.encode(buffer, bodySize);
+ encoder.close();
+ } else {
+ buffer = getReadOnlyBodyBuffer();
+ }
+
+ if (Boolean.TRUE.equals(getBooleanProperty(Message.HDR_LARGE_COMPRESSED))) {
+ buffer = inflate(buffer);
+ }
+ } catch (Exception e) {
+ ActiveMQIOErrorException e2 = new ActiveMQIOErrorException();
+ e2.initCause(e);
+ logger.warn(e.getMessage(), e);
+ return getReadOnlyBodyBuffer();
+ }
+
+ return buffer;
+ }
+
+ private ActiveMQBuffer inflate(ActiveMQBuffer buffer) throws DataFormatException {
+ int bytesToRead = buffer.readableBytes();
+ Inflater inflater = new Inflater();
--- End diff --
The `Inflater` should be part of the parameter in order to allow future API usage allowing it to be pooled (it is not a lightweight object)
---
[GitHub] activemq-artemis pull request #1970: Artemis 1765 Large Mesasge fixes betwee...
Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/1970#discussion_r176498277
--- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---
@@ -213,6 +218,63 @@ public ActiveMQBuffer getReadOnlyBodyBuffer() {
return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
}
+ /**
+ * This will return the proper buffer to represent the data of the Message. If compressed it will decompress.
+ * If large, it will read from the file or streaming.
+ * @return
+ * @throws ActiveMQException
+ */
+ @Override
+ public ActiveMQBuffer getDataBuffer() {
+
+ ActiveMQBuffer buffer;
+
+ try {
+ if (isLargeMessage()) {
+ LargeBodyEncoder encoder = getBodyEncoder();
+ encoder.open();
+ int bodySize = (int) encoder.getLargeBodySize();
+
+ buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
+
+ encoder.encode(buffer, bodySize);
+ encoder.close();
+ } else {
+ buffer = getReadOnlyBodyBuffer();
+ }
+
+ if (Boolean.TRUE.equals(getBooleanProperty(Message.HDR_LARGE_COMPRESSED))) {
+ buffer = inflate(buffer);
+ }
+ } catch (Exception e) {
+ ActiveMQIOErrorException e2 = new ActiveMQIOErrorException();
--- End diff --
the exception could be packed in separate method too, because is the uncommon path
---
[GitHub] activemq-artemis pull request #1970: Artemis 1765 Large Mesasge fixes betwee...
Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/1970#discussion_r176509907
--- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---
@@ -213,6 +218,63 @@ public ActiveMQBuffer getReadOnlyBodyBuffer() {
return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
}
+ /**
+ * This will return the proper buffer to represent the data of the Message. If compressed it will decompress.
+ * If large, it will read from the file or streaming.
+ * @return
+ * @throws ActiveMQException
+ */
+ @Override
+ public ActiveMQBuffer getDataBuffer() {
+
+ ActiveMQBuffer buffer;
+
+ try {
+ if (isLargeMessage()) {
+ LargeBodyEncoder encoder = getBodyEncoder();
--- End diff --
I know, but `getDataBuffer` is used on OpenWire hot path and could be made smaller to help the JVM.
To help in these case would be good to create:
```
private ActiveMQBuffer getLargeMessageDataBuffer(){
//the code for the large message case
}
```
and let the caller method became smaller thanks to this refactoring. wdyt?
---
[GitHub] activemq-artemis pull request #1970: Artemis 1765 Large Mesasge fixes betwee...
Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/1970#discussion_r176496551
--- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---
@@ -213,6 +218,63 @@ public ActiveMQBuffer getReadOnlyBodyBuffer() {
return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
}
+ /**
+ * This will return the proper buffer to represent the data of the Message. If compressed it will decompress.
+ * If large, it will read from the file or streaming.
+ * @return
+ * @throws ActiveMQException
+ */
+ @Override
+ public ActiveMQBuffer getDataBuffer() {
+
+ ActiveMQBuffer buffer;
+
+ try {
+ if (isLargeMessage()) {
+ LargeBodyEncoder encoder = getBodyEncoder();
+ encoder.open();
+ int bodySize = (int) encoder.getLargeBodySize();
+
+ buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
+
+ encoder.encode(buffer, bodySize);
+ encoder.close();
+ } else {
+ buffer = getReadOnlyBodyBuffer();
+ }
+
+ if (Boolean.TRUE.equals(getBooleanProperty(Message.HDR_LARGE_COMPRESSED))) {
+ buffer = inflate(buffer);
+ }
+ } catch (Exception e) {
+ ActiveMQIOErrorException e2 = new ActiveMQIOErrorException();
+ e2.initCause(e);
+ logger.warn(e.getMessage(), e);
+ return getReadOnlyBodyBuffer();
+ }
+
+ return buffer;
+ }
+
+ private ActiveMQBuffer inflate(ActiveMQBuffer buffer) throws DataFormatException {
+ int bytesToRead = buffer.readableBytes();
+ Inflater inflater = new Inflater();
+ inflater.setInput(ByteUtil.getActiveArray(buffer.readBytes(bytesToRead).toByteBuffer()));
--- End diff --
Is necessary to perform such many copies of the data?
Both `setInput` and `inflate` have range based versions `*(byte[] b, int off, int len)` and that could be used to reduce the number of copies.
---