You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/03/22 20:19:10 UTC
[3/4] activemq-artemis git commit: ARTEMIS-1765 Fixing Large Message
Compression and Conversion
ARTEMIS-1765 Fixing Large Message Compression and Conversion
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e86acd48
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e86acd48
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e86acd48
Branch: refs/heads/master
Commit: e86acd4824d3a94d124693fbd7887a1f1b9b072c
Parents: 51f105d
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Mar 22 10:40:59 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Mar 22 16:11:11 2018 -0400
----------------------------------------------------------------------
.../activemq/artemis/api/core/ICoreMessage.java | 7 +++
.../artemis/core/message/impl/CoreMessage.java | 65 ++++++++++++++++++++
.../amqp/converter/jms/ServerJMSMessage.java | 2 +-
.../core/protocol/mqtt/MQTTPublishManager.java | 4 +-
.../core/protocol/mqtt/MQTTSessionCallback.java | 1 -
.../protocol/mqtt/MQTTSubscriptionManager.java | 2 +-
.../openwire/OpenWireMessageConverter.java | 2 +-
.../core/protocol/stomp/StompSession.java | 38 +-----------
.../tests/integration/client/ConsumerTest.java | 20 +++++-
9 files changed, 96 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e86acd48/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
index 179f8c5..f0eb1b6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
@@ -42,6 +42,13 @@ public interface ICoreMessage extends Message {
ActiveMQBuffer getReadOnlyBodyBuffer();
/**
+ * Returns a readOnlyBodyBuffer or a decompressed one if the message is compressed.
+ * or the large message buffer.
+ * @return
+ */
+ ActiveMQBuffer getDataBuffer();
+
+ /**
* Return the type of the message
*/
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e86acd48/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 5ed46cd..0918828 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -21,8 +21,11 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Set;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -38,6 +41,7 @@ import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.reader.MessageUtil;
+import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
@@ -213,6 +217,67 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
}
+ /**
+ * This will return the proper buffer to represent the data of the Message. If compressed it will decompress.
+ * If large, it will read from the file or streaming.
+ * @return
+ * @throws ActiveMQException
+ */
+ @Override
+ public ActiveMQBuffer getDataBuffer() {
+
+ ActiveMQBuffer buffer;
+
+ try {
+ if (isLargeMessage()) {
+ buffer = getLargeMessageBuffer();
+ } else {
+ buffer = getReadOnlyBodyBuffer();
+ }
+
+ if (Boolean.TRUE.equals(getBooleanProperty(Message.HDR_LARGE_COMPRESSED))) {
+ buffer = inflate(buffer);
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ return getReadOnlyBodyBuffer();
+ }
+
+ return buffer;
+ }
+
+ private ActiveMQBuffer getLargeMessageBuffer() throws ActiveMQException {
+ ActiveMQBuffer buffer;
+ LargeBodyEncoder encoder = getBodyEncoder();
+ encoder.open();
+ int bodySize = (int) encoder.getLargeBodySize();
+
+ buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
+
+ encoder.encode(buffer, bodySize);
+ encoder.close();
+ return buffer;
+ }
+
+ private ActiveMQBuffer inflate(ActiveMQBuffer buffer) throws DataFormatException {
+ int bytesToRead = buffer.readableBytes();
+ Inflater inflater = new Inflater();
+ inflater.setInput(ByteUtil.getActiveArray(buffer.readBytes(bytesToRead).toByteBuffer()));
+
+ //get the real size of large message
+ long sizeBody = getLongProperty(Message.HDR_LARGE_BODY_SIZE);
+
+ byte[] data = new byte[(int) sizeBody];
+ inflater.inflate(data);
+ inflater.end();
+ ActiveMQBuffer qbuff = ActiveMQBuffers.wrappedBuffer(data);
+ qbuff.resetReaderIndex();
+ qbuff.resetWriterIndex();
+ qbuff.writeBytes(data);
+ buffer = qbuff;
+ return buffer;
+ }
+
@Override
public SimpleString getGroupID() {
return this.getSimpleStringProperty(Message.HDR_GROUP_ID);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e86acd48/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
index b070591..0ef2041 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
@@ -73,7 +73,7 @@ public class ServerJMSMessage implements Message {
protected ActiveMQBuffer getReadBodyBuffer() {
if (readBodyBuffer == null) {
// to avoid clashes between multiple threads
- readBodyBuffer = message.getReadOnlyBodyBuffer();
+ readBodyBuffer = message.getDataBuffer();
}
return readBodyBuffer;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e86acd48/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index febc364..667f5be 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -269,7 +269,7 @@ public class MQTTPublishManager {
switch (message.getType()) {
case Message.TEXT_TYPE:
try {
- SimpleString text = message.getReadOnlyBodyBuffer().readNullableSimpleString();
+ SimpleString text = message.getDataBuffer().readNullableSimpleString();
byte[] stringPayload = text.toString().getBytes("UTF-8");
payload = ByteBufAllocator.DEFAULT.buffer(stringPayload.length);
payload.writeBytes(stringPayload);
@@ -278,7 +278,7 @@ public class MQTTPublishManager {
log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e);
}
default:
- ActiveMQBuffer bodyBuffer = message.getReadOnlyBodyBuffer();
+ ActiveMQBuffer bodyBuffer = message.getDataBuffer();
payload = ByteBufAllocator.DEFAULT.buffer(bodyBuffer.writerIndex());
payload.writeBytes(bodyBuffer.byteBuf());
break;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e86acd48/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 39e2ba9..a49cf11 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -64,7 +64,6 @@ public class MQTTSessionCallback implements SessionCallback {
byte[] body,
boolean continues,
boolean requiresResponse) {
- log.warn("Sending LARGE MESSAGE");
return 1;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e86acd48/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index 49ab5d9..4093f5e 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -154,7 +154,7 @@ public class MQTTSubscriptionManager {
*/
private void createConsumerForSubscriptionQueue(Queue queue, String topic, int qos) throws Exception {
long cid = session.getServer().getStorageManager().generateID();
- ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue.getName(), null, false, true, -1);
+ ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue.getName(), null, false, false, -1);
consumer.setStarted(true);
consumers.put(topic, consumer);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e86acd48/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 66fff2d..fd1f732 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -513,7 +513,7 @@ public final class OpenWireMessageConverter {
final Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
final boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
final byte[] bytes;
- final ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
+ final ActiveMQBuffer buffer = coreMessage.getDataBuffer();
buffer.resetReaderIndex();
switch (coreType) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e86acd48/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 0a12b47..8a573e6 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -23,17 +23,13 @@ import java.util.Set;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
-import java.util.zip.Inflater;
-import io.netty.buffer.UnpooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
-import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -47,7 +43,6 @@ import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
-import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.PendingTask;
import org.apache.activemq.artemis.utils.UUIDGenerator;
@@ -142,38 +137,7 @@ public class StompSession implements SessionCallback {
if (subscription == null)
return 0;
StompFrame frame;
- ActiveMQBuffer buffer;
-
- if (coreMessage.isLargeMessage()) {
- LargeBodyEncoder encoder = coreMessage.getBodyEncoder();
- encoder.open();
- int bodySize = (int) encoder.getLargeBodySize();
-
- buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
-
- encoder.encode(buffer, bodySize);
- encoder.close();
- } else {
- buffer = coreMessage.getReadOnlyBodyBuffer();
- }
-
- if (Boolean.TRUE.equals(serverMessage.getBooleanProperty(Message.HDR_LARGE_COMPRESSED))) {
- ActiveMQBuffer qbuff = buffer;
- int bytesToRead = qbuff.readerIndex();
- Inflater inflater = new Inflater();
- inflater.setInput(ByteUtil.getActiveArray(qbuff.readBytes(bytesToRead).toByteBuffer()));
-
- //get the real size of large message
- long sizeBody = newServerMessage.getLongProperty(Message.HDR_LARGE_BODY_SIZE);
-
- byte[] data = new byte[(int) sizeBody];
- inflater.inflate(data);
- inflater.end();
- qbuff.resetReaderIndex();
- qbuff.resetWriterIndex();
- qbuff.writeBytes(data);
- buffer = qbuff;
- }
+ ActiveMQBuffer buffer = coreMessage.getDataBuffer();
frame = connection.createStompMessage(newServerMessage, buffer, subscription, deliveryCount);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e86acd48/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
index 0b36e18..f103d41 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
@@ -421,7 +421,10 @@ public class ConsumerTest extends ActiveMQTestBase {
private ConnectionFactory createFactory(int protocol) {
switch (protocol) {
- case 1: return new ActiveMQConnectionFactory();// core protocol
+ case 1: ActiveMQConnectionFactory coreCF = new ActiveMQConnectionFactory();// core protocol
+ coreCF.setCompressLargeMessage(true);
+ coreCF.setMinLargeMessageSize(10 * 1024);
+ return coreCF;
case 2: return new JmsConnectionFactory("amqp://localhost:61616"); // amqp
case 3: return new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616"); // openwire
default: return null;
@@ -446,7 +449,15 @@ public class ConsumerTest extends ActiveMQTestBase {
TextMessage msg = session.createTextMessage("hello");
msg.setIntProperty("mycount", 0);
producer.send(msg);
- connection.close();
+
+ StringBuffer bufferLarge = new StringBuffer();
+ while (bufferLarge.length() < 100 * 1024) {
+ bufferLarge.append(" ");
+ }
+
+ msg = session.createTextMessage(bufferLarge.toString());
+ msg.setIntProperty("mycount", 1);
+ producer.send(msg);
connection = factoryConsume.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -461,6 +472,11 @@ public class ConsumerTest extends ActiveMQTestBase {
Assert.assertEquals(0, message.getIntProperty("mycount"));
Assert.assertEquals("hello", message.getText());
+ message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals(1, message.getIntProperty("mycount"));
+ Assert.assertEquals(bufferLarge.toString(), message.getText());
+
Wait.waitFor(() -> server.getPagingManager().getGlobalSize() == 0, 5000, 100);
Assert.assertEquals(0, server.getPagingManager().getGlobalSize());