You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/03/22 20:19:10 UTC

[3/4] activemq-artemis git commit: ARTEMIS-1765 Fixing Large Message Compression and Conversion

ARTEMIS-1765 Fixing Large Message Compression and Conversion


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e86acd48
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e86acd48
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e86acd48

Branch: refs/heads/master
Commit: e86acd4824d3a94d124693fbd7887a1f1b9b072c
Parents: 51f105d
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Mar 22 10:40:59 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Mar 22 16:11:11 2018 -0400

----------------------------------------------------------------------
 .../activemq/artemis/api/core/ICoreMessage.java |  7 +++
 .../artemis/core/message/impl/CoreMessage.java  | 65 ++++++++++++++++++++
 .../amqp/converter/jms/ServerJMSMessage.java    |  2 +-
 .../core/protocol/mqtt/MQTTPublishManager.java  |  4 +-
 .../core/protocol/mqtt/MQTTSessionCallback.java |  1 -
 .../protocol/mqtt/MQTTSubscriptionManager.java  |  2 +-
 .../openwire/OpenWireMessageConverter.java      |  2 +-
 .../core/protocol/stomp/StompSession.java       | 38 +-----------
 .../tests/integration/client/ConsumerTest.java  | 20 +++++-
 9 files changed, 96 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e86acd48/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
index 179f8c5..f0eb1b6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
@@ -42,6 +42,13 @@ public interface ICoreMessage extends Message {
    ActiveMQBuffer getReadOnlyBodyBuffer();
 
    /**
+    * Returns a readOnlyBodyBuffer or a decompressed one if the message is compressed.
+    * or the large message buffer.
+    * @return
+    */
+   ActiveMQBuffer getDataBuffer();
+
+   /**
     * Return the type of the message
     */
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e86acd48/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 5ed46cd..0918828 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -21,8 +21,11 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.LinkedList;
 import java.util.Set;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -38,6 +41,7 @@ import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import org.apache.activemq.artemis.reader.MessageUtil;
+import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.DataConstants;
 import org.apache.activemq.artemis.utils.UUID;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
@@ -213,6 +217,67 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
       return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
    }
 
+   /**
+    * This will return the proper buffer to represent the data of the Message. If compressed it will decompress.
+    * If large, it will read from the file or streaming.
+    * @return
+    * @throws ActiveMQException
+    */
+   @Override
+   public ActiveMQBuffer getDataBuffer() {
+
+      ActiveMQBuffer buffer;
+
+      try {
+         if (isLargeMessage()) {
+            buffer = getLargeMessageBuffer();
+         } else {
+            buffer = getReadOnlyBodyBuffer();
+         }
+
+         if (Boolean.TRUE.equals(getBooleanProperty(Message.HDR_LARGE_COMPRESSED))) {
+            buffer = inflate(buffer);
+         }
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+         return getReadOnlyBodyBuffer();
+      }
+
+      return buffer;
+   }
+
+   private ActiveMQBuffer getLargeMessageBuffer() throws ActiveMQException {
+      ActiveMQBuffer buffer;
+      LargeBodyEncoder encoder = getBodyEncoder();
+      encoder.open();
+      int bodySize = (int) encoder.getLargeBodySize();
+
+      buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
+
+      encoder.encode(buffer, bodySize);
+      encoder.close();
+      return buffer;
+   }
+
+   private ActiveMQBuffer inflate(ActiveMQBuffer buffer) throws DataFormatException {
+      int bytesToRead = buffer.readableBytes();
+      Inflater inflater = new Inflater();
+      inflater.setInput(ByteUtil.getActiveArray(buffer.readBytes(bytesToRead).toByteBuffer()));
+
+      //get the real size of large message
+      long sizeBody = getLongProperty(Message.HDR_LARGE_BODY_SIZE);
+
+      byte[] data = new byte[(int) sizeBody];
+      inflater.inflate(data);
+      inflater.end();
+      ActiveMQBuffer qbuff = ActiveMQBuffers.wrappedBuffer(data);
+      qbuff.resetReaderIndex();
+      qbuff.resetWriterIndex();
+      qbuff.writeBytes(data);
+      buffer = qbuff;
+      return buffer;
+   }
+
    @Override
    public SimpleString getGroupID() {
       return this.getSimpleStringProperty(Message.HDR_GROUP_ID);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e86acd48/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
index b070591..0ef2041 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
@@ -73,7 +73,7 @@ public class ServerJMSMessage implements Message {
    protected ActiveMQBuffer getReadBodyBuffer() {
       if (readBodyBuffer == null) {
          // to avoid clashes between multiple threads
-         readBodyBuffer = message.getReadOnlyBodyBuffer();
+         readBodyBuffer = message.getDataBuffer();
       }
       return readBodyBuffer;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e86acd48/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index febc364..667f5be 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -269,7 +269,7 @@ public class MQTTPublishManager {
       switch (message.getType()) {
          case Message.TEXT_TYPE:
             try {
-               SimpleString text = message.getReadOnlyBodyBuffer().readNullableSimpleString();
+               SimpleString text = message.getDataBuffer().readNullableSimpleString();
                byte[] stringPayload = text.toString().getBytes("UTF-8");
                payload = ByteBufAllocator.DEFAULT.buffer(stringPayload.length);
                payload.writeBytes(stringPayload);
@@ -278,7 +278,7 @@ public class MQTTPublishManager {
                log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e);
             }
          default:
-            ActiveMQBuffer bodyBuffer = message.getReadOnlyBodyBuffer();
+            ActiveMQBuffer bodyBuffer = message.getDataBuffer();
             payload = ByteBufAllocator.DEFAULT.buffer(bodyBuffer.writerIndex());
             payload.writeBytes(bodyBuffer.byteBuf());
             break;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e86acd48/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 39e2ba9..a49cf11 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -64,7 +64,6 @@ public class MQTTSessionCallback implements SessionCallback {
                                            byte[] body,
                                            boolean continues,
                                            boolean requiresResponse) {
-      log.warn("Sending LARGE MESSAGE");
       return 1;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e86acd48/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index 49ab5d9..4093f5e 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -154,7 +154,7 @@ public class MQTTSubscriptionManager {
     */
    private void createConsumerForSubscriptionQueue(Queue queue, String topic, int qos) throws Exception {
       long cid = session.getServer().getStorageManager().generateID();
-      ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue.getName(), null, false, true, -1);
+      ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue.getName(), null, false, false, -1);
       consumer.setStarted(true);
 
       consumers.put(topic, consumer);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e86acd48/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 66fff2d..fd1f732 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -513,7 +513,7 @@ public final class OpenWireMessageConverter {
       final Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
       final boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
       final byte[] bytes;
-      final ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
+      final ActiveMQBuffer buffer = coreMessage.getDataBuffer();
       buffer.resetReaderIndex();
 
       switch (coreType) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e86acd48/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 0a12b47..8a573e6 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -23,17 +23,13 @@ import java.util.Set;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingDeque;
-import java.util.zip.Inflater;
 
-import io.netty.buffer.UnpooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
-import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -47,7 +43,6 @@ import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
-import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.ConfigurationHelper;
 import org.apache.activemq.artemis.utils.PendingTask;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
@@ -142,38 +137,7 @@ public class StompSession implements SessionCallback {
          if (subscription == null)
             return 0;
          StompFrame frame;
-         ActiveMQBuffer buffer;
-
-         if (coreMessage.isLargeMessage()) {
-            LargeBodyEncoder encoder = coreMessage.getBodyEncoder();
-            encoder.open();
-            int bodySize = (int) encoder.getLargeBodySize();
-
-            buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
-
-            encoder.encode(buffer, bodySize);
-            encoder.close();
-         } else {
-            buffer = coreMessage.getReadOnlyBodyBuffer();
-         }
-
-         if (Boolean.TRUE.equals(serverMessage.getBooleanProperty(Message.HDR_LARGE_COMPRESSED))) {
-            ActiveMQBuffer qbuff = buffer;
-            int bytesToRead = qbuff.readerIndex();
-            Inflater inflater = new Inflater();
-            inflater.setInput(ByteUtil.getActiveArray(qbuff.readBytes(bytesToRead).toByteBuffer()));
-
-            //get the real size of large message
-            long sizeBody = newServerMessage.getLongProperty(Message.HDR_LARGE_BODY_SIZE);
-
-            byte[] data = new byte[(int) sizeBody];
-            inflater.inflate(data);
-            inflater.end();
-            qbuff.resetReaderIndex();
-            qbuff.resetWriterIndex();
-            qbuff.writeBytes(data);
-            buffer = qbuff;
-         }
+         ActiveMQBuffer buffer = coreMessage.getDataBuffer();
 
          frame = connection.createStompMessage(newServerMessage, buffer, subscription, deliveryCount);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e86acd48/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
index 0b36e18..f103d41 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
@@ -421,7 +421,10 @@ public class ConsumerTest extends ActiveMQTestBase {
 
    private ConnectionFactory createFactory(int protocol) {
       switch (protocol) {
-         case 1: return new ActiveMQConnectionFactory();// core protocol
+         case 1: ActiveMQConnectionFactory coreCF = new ActiveMQConnectionFactory();// core protocol
+            coreCF.setCompressLargeMessage(true);
+            coreCF.setMinLargeMessageSize(10 * 1024);
+            return coreCF;
          case 2: return new JmsConnectionFactory("amqp://localhost:61616"); // amqp
          case 3: return new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616"); // openwire
          default: return null;
@@ -446,7 +449,15 @@ public class ConsumerTest extends ActiveMQTestBase {
          TextMessage msg = session.createTextMessage("hello");
          msg.setIntProperty("mycount", 0);
          producer.send(msg);
-         connection.close();
+
+         StringBuffer bufferLarge = new StringBuffer();
+         while (bufferLarge.length() < 100 * 1024) {
+            bufferLarge.append("          ");
+         }
+
+         msg = session.createTextMessage(bufferLarge.toString());
+         msg.setIntProperty("mycount", 1);
+         producer.send(msg);
 
          connection = factoryConsume.createConnection();
          session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -461,6 +472,11 @@ public class ConsumerTest extends ActiveMQTestBase {
          Assert.assertEquals(0, message.getIntProperty("mycount"));
          Assert.assertEquals("hello", message.getText());
 
+         message = (TextMessage) consumer.receive(1000);
+         Assert.assertNotNull(message);
+         Assert.assertEquals(1, message.getIntProperty("mycount"));
+         Assert.assertEquals(bufferLarge.toString(), message.getText());
+
          Wait.waitFor(() -> server.getPagingManager().getGlobalSize() == 0, 5000, 100);
          Assert.assertEquals(0, server.getPagingManager().getGlobalSize());