You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/12 21:56:55 UTC

[1/2] activemq-artemis git commit: ARTEMIS-2083 Decode only the relavent portions of the message

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 21190aabb -> 576f67c7c


ARTEMIS-2083 Decode only the relavent portions of the message

Ensure that the Body of the message is never decoded in the partial
decode phase of the message processing and also gaurd against the
decode of ApplicationProperties which should be done lazily.  Add
lazy decode of DeliveryAnnotations as they are not used at present.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/369e475a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/369e475a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/369e475a

Branch: refs/heads/master
Commit: 369e475af629d2159abae3a7b3e4162b924f7a3a
Parents: 21190aa
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Sep 12 12:42:45 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Sep 12 16:50:00 2018 -0400

----------------------------------------------------------------------
 .../protocol/amqp/broker/AMQPMessage.java       | 138 +++++++++----------
 .../protocol/amqp/message/AMQPMessageTest.java  | 137 +++++++++++++++++-
 2 files changed, 201 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/369e475a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index c0f9d10..cff5229 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -53,9 +53,9 @@ import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Header;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Properties;
-import org.apache.qpid.proton.amqp.messaging.Section;
 import org.apache.qpid.proton.codec.DecoderImpl;
 import org.apache.qpid.proton.codec.ReadableBuffer;
+import org.apache.qpid.proton.codec.TypeConstructor;
 import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.message.impl.MessageImpl;
@@ -93,6 +93,7 @@ public class AMQPMessage extends RefCountMessage {
    private DeliveryAnnotations _deliveryAnnotations;
    private MessageAnnotations _messageAnnotations;
    private Properties _properties;
+   private int deliveryAnnotationsPosition = -1;
    private int appLocation = -1;
    private ApplicationProperties applicationProperties;
    private long scheduledTime = -1;
@@ -195,16 +196,30 @@ public class AMQPMessage extends RefCountMessage {
          buffer.position(appLocation);
          TLSEncode.getDecoder().setBuffer(buffer);
          Object section = TLSEncode.getDecoder().readObject();
-         if (section instanceof ApplicationProperties) {
-            this.applicationProperties = (ApplicationProperties) section;
-         }
-         this.appLocation = -1;
+         applicationProperties = (ApplicationProperties) section;
+         appLocation = -1;
          TLSEncode.getDecoder().setBuffer(null);
       }
 
       return applicationProperties;
    }
 
+   private DeliveryAnnotations getDeliveryAnnotations() {
+      parseHeaders();
+
+      if (_deliveryAnnotations == null && deliveryAnnotationsPosition >= 0) {
+         ReadableBuffer buffer = data.duplicate();
+         buffer.position(deliveryAnnotationsPosition);
+         TLSEncode.getDecoder().setBuffer(buffer);
+         Object section = TLSEncode.getDecoder().readObject();
+         _deliveryAnnotations = (DeliveryAnnotations) section;
+         deliveryAnnotationsPosition = -1;
+         TLSEncode.getDecoder().setBuffer(null);
+      }
+
+      return _deliveryAnnotations;
+   }
+
    private synchronized void parseHeaders() {
       if (!parsedHeaders) {
          if (data == null) {
@@ -380,83 +395,63 @@ public class AMQPMessage extends RefCountMessage {
       decoder.setBuffer(buffer.rewind());
 
       _header = null;
+      expiration = 0;
+      headerEnds = 0;
+      messagePaylodStart = 0;
       _deliveryAnnotations = null;
       _messageAnnotations = null;
       _properties = null;
       applicationProperties = null;
-      Section section = null;
+      appLocation = -1;
+      deliveryAnnotationsPosition = -1;
 
       try {
-         if (buffer.hasRemaining()) {
-            section = (Section) decoder.readObject();
-         }
-
-         if (section instanceof Header) {
-            _header = (Header) section;
-            headerEnds = buffer.position();
-            messagePaylodStart = headerEnds;
-            this.durable = _header.getDurable();
-
-            if (_header.getTtl() != null) {
-               this.expiration = System.currentTimeMillis() + _header.getTtl().intValue();
-            }
-
-            if (buffer.hasRemaining()) {
-               section = (Section) decoder.readObject();
-            } else {
-               section = null;
-            }
-
-         } else {
-            // meaning there is no header
-            headerEnds = 0;
-         }
-         if (section instanceof DeliveryAnnotations) {
-            _deliveryAnnotations = (DeliveryAnnotations) section;
-
-            // Advance the start beyond the delivery annotations so they are not written
-            // out on send of the message.
-            messagePaylodStart = buffer.position();
-
-            if (buffer.hasRemaining()) {
-               section = (Section) decoder.readObject();
-            } else {
-               section = null;
-            }
-         }
-         if (section instanceof MessageAnnotations) {
-            _messageAnnotations = (MessageAnnotations) section;
-
-            if (buffer.hasRemaining()) {
-               section = (Section) decoder.readObject();
-            } else {
-               section = null;
-            }
-         }
-         if (section instanceof Properties) {
-            _properties = (Properties) section;
-
-            if (_properties.getAbsoluteExpiryTime() != null && _properties.getAbsoluteExpiryTime().getTime() > 0) {
-               this.expiration = _properties.getAbsoluteExpiryTime().getTime();
-            }
-
-            // We don't read the next section on purpose, as we will parse ApplicationProperties
-            // lazily
-            section = null;
-         }
-
-         if (section instanceof ApplicationProperties) {
-            applicationProperties = (ApplicationProperties) section;
-         } else {
-            if (buffer.hasRemaining()) {
-               this.appLocation = buffer.position();
+         while (buffer.hasRemaining()) {
+            int constructorPos = buffer.position();
+            TypeConstructor<?> constructor = decoder.readConstructor();
+            if (Header.class.equals(constructor.getTypeClass())) {
+               _header = (Header) constructor.readValue();
+               headerEnds = messagePaylodStart = buffer.position();
+               durable = _header.getDurable();
+               if (_header.getTtl() != null) {
+                  expiration = System.currentTimeMillis() + _header.getTtl().intValue();
+               }
+            } else if (DeliveryAnnotations.class.equals(constructor.getTypeClass())) {
+               // Don't decode these as they are not used by the broker at all and are
+               // discarded on send, mark for lazy decode if ever needed.
+               constructor.skipValue();
+               deliveryAnnotationsPosition = constructorPos;
+               messagePaylodStart = buffer.position();
+            } else if (MessageAnnotations.class.equals(constructor.getTypeClass())) {
+               _messageAnnotations = (MessageAnnotations) constructor.readValue();
+            } else if (Properties.class.equals(constructor.getTypeClass())) {
+               _properties = (Properties) constructor.readValue();
+
+               if (_properties.getAbsoluteExpiryTime() != null && _properties.getAbsoluteExpiryTime().getTime() > 0) {
+                  expiration = _properties.getAbsoluteExpiryTime().getTime();
+               }
+
+               // Next is either Application Properties or the rest of the message, leave it for
+               // lazy decode of the ApplicationProperties should there be any.  Check first though
+               // as we don't want to actually decode the body which could be expensive.
+               if (buffer.hasRemaining()) {
+                  constructor = decoder.peekConstructor();
+                  if (ApplicationProperties.class.equals(constructor.getTypeClass())) {
+                     appLocation = buffer.position();
+                  }
+               }
+               break;
+            } else if (ApplicationProperties.class.equals(constructor.getTypeClass())) {
+               // Lazy decoding will start at the TypeConstructor of these ApplicationProperties
+               appLocation = constructorPos;
+               break;
             } else {
-               this.appLocation = -1;
+               break;
             }
          }
       } finally {
          decoder.setByteBuffer(null);
-         data.position(0);
+         buffer.position(0);
       }
    }
 
@@ -1082,6 +1077,7 @@ public class AMQPMessage extends RefCountMessage {
    public void reencode() {
       parseHeaders();
       getApplicationProperties();
+      getDeliveryAnnotations();
       if (_header != null) getProtonMessage().setHeader(_header);
       if (_deliveryAnnotations != null) getProtonMessage().setDeliveryAnnotations(_deliveryAnnotations);
       if (_messageAnnotations != null) getProtonMessage().setMessageAnnotations(_messageAnnotations);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/369e475a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
index 42ffaee..a6a29a0 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Date;
@@ -32,13 +33,20 @@ import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessagePersisterV2;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
 import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
 import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.qpid.proton.amqp.UnsignedByte;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.UnsignedLong;
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
 import org.apache.qpid.proton.amqp.messaging.Header;
 import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.EncodingCodes;
+import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.junit.Assert;
@@ -58,7 +66,7 @@ public class AMQPMessageTest {
       protonMessage.setProperties(properties);
       protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7));
       protonMessage.getHeader().setDurable(Boolean.TRUE);
-      protonMessage.setApplicationProperties(new ApplicationProperties(new HashMap()));
+      protonMessage.setApplicationProperties(new ApplicationProperties(new HashMap<>()));
 
       AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
 
@@ -76,7 +84,7 @@ public class AMQPMessageTest {
       protonMessage.setProperties(properties);
       protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7));
       protonMessage.getHeader().setDurable(Boolean.TRUE);
-      HashMap map = new HashMap();
+      HashMap<String, Object> map = new HashMap<>();
       map.put("key", "string1");
       protonMessage.setApplicationProperties(new ApplicationProperties(map));
 
@@ -97,7 +105,6 @@ public class AMQPMessageTest {
       assertEquals(true, newDecoded.getHeader().getDurable());
       assertEquals("newAddress", newDecoded.getAddress());
       assertEquals("string1", newDecoded.getObjectProperty("key"));
-
    }
 
    @Test
@@ -281,7 +288,131 @@ public class AMQPMessageTest {
          Assert.assertEquals("someAddress", readMessage.getAddress());
          Assert.assertArrayEquals(original, readMessage.getExtraBytesProperty(name));
       }
+   }
+
+   private static final UnsignedLong AMQPVALUE_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000077L);
+   private static final UnsignedLong APPLICATION_PROPERTIES_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000074L);
+   private static final UnsignedLong DELIVERY_ANNOTATIONS_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000071L);
+
+   @Test
+   public void testPartialDecodeIgnoresDeliveryAnnotationsByDefault() {
+      Header header = new Header();
+      header.setDurable(true);
+      header.setPriority(UnsignedByte.valueOf((byte) 6));
+
+      ByteBuf encodedBytes = Unpooled.buffer(1024);
+      NettyWritable writable = new NettyWritable(encodedBytes);
+
+      EncoderImpl encoder = TLSEncode.getEncoder();
+      encoder.setByteBuffer(writable);
+      encoder.writeObject(header);
+
+      // Signal body of AmqpValue but write corrupt underlying type info
+      encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
+      encodedBytes.writeByte(EncodingCodes.SMALLULONG);
+      encodedBytes.writeByte(DELIVERY_ANNOTATIONS_DESCRIPTOR.byteValue());
+      encodedBytes.writeByte(EncodingCodes.MAP8);
+      encodedBytes.writeByte(2);  // Size
+      encodedBytes.writeByte(2);  // Elements
+      // Use bad encoding code on underlying type of map key which will fail the decode if run
+      encodedBytes.writeByte(255);
+
+      ReadableBuffer readable = new NettyReadable(encodedBytes);
+
+      AMQPMessage message = null;
+      try {
+         message = new AMQPMessage(0, readable, null, null);
+      } catch (Exception decodeError) {
+         fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
+      }
 
+      try {
+         // This should perform the lazy decode of the DeliveryAnnotations portion of the message
+         message.reencode();
+         fail("Should have thrown an error when attempting to decode the ApplicationProperties which are malformed.");
+      } catch (Exception ex) {
+         // Expected decode to fail when building full message.
+      }
+   }
+
+   @Test
+   public void testPartialDecodeIgnoresApplicationPropertiesByDefault() {
+      Header header = new Header();
+      header.setDurable(true);
+      header.setPriority(UnsignedByte.valueOf((byte) 6));
+
+      ByteBuf encodedBytes = Unpooled.buffer(1024);
+      NettyWritable writable = new NettyWritable(encodedBytes);
+
+      EncoderImpl encoder = TLSEncode.getEncoder();
+      encoder.setByteBuffer(writable);
+      encoder.writeObject(header);
+
+      // Signal body of AmqpValue but write corrupt underlying type info
+      encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
+      encodedBytes.writeByte(EncodingCodes.SMALLULONG);
+      encodedBytes.writeByte(APPLICATION_PROPERTIES_DESCRIPTOR.byteValue());
+      // Use bad encoding code on underlying type
+      encodedBytes.writeByte(255);
+
+      ReadableBuffer readable = new NettyReadable(encodedBytes);
+
+      AMQPMessage message = null;
+      try {
+         message = new AMQPMessage(0, readable, null, null);
+      } catch (Exception decodeError) {
+         fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
+      }
+
+      assertTrue(message.isDurable());
+
+      try {
+         // This should perform the lazy decode of the ApplicationProperties portion of the message
+         message.getStringProperty("test");
+         fail("Should have thrown an error when attempting to decode the ApplicationProperties which are malformed.");
+      } catch (Exception ex) {
+         // Expected decode to fail when building full message.
+      }
+   }
+
+   @Test
+   public void testPartialDecodeIgnoresBodyByDefault() {
+      Header header = new Header();
+      header.setDurable(true);
+      header.setPriority(UnsignedByte.valueOf((byte) 6));
+
+      ByteBuf encodedBytes = Unpooled.buffer(1024);
+      NettyWritable writable = new NettyWritable(encodedBytes);
+
+      EncoderImpl encoder = TLSEncode.getEncoder();
+      encoder.setByteBuffer(writable);
+      encoder.writeObject(header);
+
+      // Signal body of AmqpValue but write corrupt underlying type info
+      encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
+      encodedBytes.writeByte(EncodingCodes.SMALLULONG);
+      encodedBytes.writeByte(AMQPVALUE_DESCRIPTOR.byteValue());
+      // Use bad encoding code on underlying type
+      encodedBytes.writeByte(255);
+
+      ReadableBuffer readable = new NettyReadable(encodedBytes);
+
+      AMQPMessage message = null;
+      try {
+         message = new AMQPMessage(0, readable, null, null);
+      } catch (Exception decodeError) {
+         fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
+      }
+
+      assertTrue(message.isDurable());
+
+      try {
+         // This will decode the body section if present in order to present it as a Proton Message object
+         message.getProtonMessage();
+         fail("Should have thrown an error when attempting to decode the body which is malformed.");
+      } catch (Exception ex) {
+         // Expected decode to fail when building full message.
+      }
    }
 
    private AMQPMessage encodeAndDecodeMessage(MessageImpl message) {


[2/2] activemq-artemis git commit: This closes #2308

Posted by cl...@apache.org.
This closes #2308


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/576f67c7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/576f67c7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/576f67c7

Branch: refs/heads/master
Commit: 576f67c7c28a8f19371b60af1e30c722040ed3d8
Parents: 21190aa 369e475
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Sep 12 17:56:48 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Sep 12 17:56:48 2018 -0400

----------------------------------------------------------------------
 .../protocol/amqp/broker/AMQPMessage.java       | 138 +++++++++----------
 .../protocol/amqp/message/AMQPMessageTest.java  | 137 +++++++++++++++++-
 2 files changed, 201 insertions(+), 74 deletions(-)
----------------------------------------------------------------------