You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@activemq.apache.org by GitBox <gi...@apache.org> on 2021/10/19 10:47:48 UTC

[GitHub] [activemq-artemis] gemmellr commented on a change in pull request #3796: ARTEMIS-3461 Generalize MBean Support on Messages and avoid converstion to core on AMQP Messages on console browsing

gemmellr commented on a change in pull request #3796:
URL: https://github.com/apache/activemq-artemis/pull/3796#discussion_r731678276



##########
File path: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
##########
@@ -1290,4 +1301,88 @@ public String getStringBody() {
 
       return body;
    }
+
+
+   // *******************************************************************************************************************************
+   // Composite Data implementation
+
+   private static MessageOpenTypeFactory TEXT_FACTORY = new TextMessageOpenTypeFactory();
+   private static MessageOpenTypeFactory BYTES_FACTORY = new BytesMessageOpenTypeFactory();
+
+
+   @Override
+   public CompositeData toCompositeData(int fieldsLimit, int deliveryCount) throws OpenDataException {
+      CompositeType ct;
+      Map<String, Object> fields;
+      byte type = getType();
+      switch (type) {
+         case Message.TEXT_TYPE:
+            ct = TEXT_FACTORY.getCompositeType();
+            fields = TEXT_FACTORY.getFields(this, fieldsLimit, deliveryCount);
+            break;
+         default:
+            ct = BYTES_FACTORY.getCompositeType();
+            fields = BYTES_FACTORY.getFields(this, fieldsLimit, deliveryCount);
+            break;
+      }
+      return new CompositeDataSupport(ct, fields);
+
+   }
+
+   static class BytesMessageOpenTypeFactory extends MessageOpenTypeFactory<CoreMessage> {
+      protected ArrayType body;
+
+      @Override
+      protected void init() throws OpenDataException {
+         super.init();
+         body = new ArrayType(SimpleType.BYTE, true);
+         addItem(CompositeDataConstants.TYPE, CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE);
+         addItem(CompositeDataConstants.BODY, CompositeDataConstants.BODY_DESCRIPTION, body);
+      }
+
+      @Override
+      public Map<String, Object> getFields(CoreMessage m, int valueSizeLimit, int delivery) throws OpenDataException {
+         Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
+         rc.put(CompositeDataConstants.TYPE, m.getType());
+         if (!m.isLargeMessage()) {
+            ActiveMQBuffer bodyCopy = m.toCore().getReadOnlyBodyBuffer();

Review comment:
       Unnecessary call to toCore on what is already a CoreMessage instance.

##########
File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
##########
@@ -1726,4 +1791,103 @@ public Object getOwner() {
    public void setOwner(Object object) {
       this.owner = object;
    }
+
+
+   // *******************************************************************************************************************************
+   // Composite Data implementation
+
+   private static MessageOpenTypeFactory AMQP_FACTORY = new AmqpMessageOpenTypeFactory();
+
+   static class AmqpMessageOpenTypeFactory extends MessageOpenTypeFactory<AMQPMessage> {
+      @Override
+      protected void init() throws OpenDataException {
+         super.init();
+         addItem(CompositeDataConstants.TEXT_BODY, CompositeDataConstants.TEXT_BODY, SimpleType.STRING);
+         addItem(CompositeDataConstants.TYPE, CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE);
+      }
+
+      @Override
+      public Map<String, Object> getFields(AMQPMessage m, int valueSizeLimit, int delivery) throws OpenDataException {
+         Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
+
+         if (!m.isLargeMessage()) {
+            m.ensureScanning();
+         }
+
+         Properties properties = m.getCurrentProperties();
+
+         byte type = getType(m, properties);
+
+         rc.put(CompositeDataConstants.TYPE, type);
+
+         if (m.isLargeMessage())  {
+            rc.put(CompositeDataConstants.TEXT_BODY, "... Large message ...");
+         } else {
+            if (m.getBody() instanceof AmqpValue) {
+               Object amqpValue = ((AmqpValue) m.getBody()).getValue();
+
+               rc.put(CompositeDataConstants.TEXT_BODY, JsonUtil.truncateString(amqpValue.toString(), valueSizeLimit));
+            } else {
+               rc.put(CompositeDataConstants.TEXT_BODY, JsonUtil.truncateString("" + m.getBody(), valueSizeLimit));

Review comment:
       String.valueOf(..) would seem less ugly than a concat.

##########
File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
##########
@@ -1726,4 +1791,103 @@ public Object getOwner() {
    public void setOwner(Object object) {
       this.owner = object;
    }
+
+
+   // *******************************************************************************************************************************
+   // Composite Data implementation
+
+   private static MessageOpenTypeFactory AMQP_FACTORY = new AmqpMessageOpenTypeFactory();
+
+   static class AmqpMessageOpenTypeFactory extends MessageOpenTypeFactory<AMQPMessage> {
+      @Override
+      protected void init() throws OpenDataException {
+         super.init();
+         addItem(CompositeDataConstants.TEXT_BODY, CompositeDataConstants.TEXT_BODY, SimpleType.STRING);
+         addItem(CompositeDataConstants.TYPE, CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE);
+      }
+
+      @Override
+      public Map<String, Object> getFields(AMQPMessage m, int valueSizeLimit, int delivery) throws OpenDataException {
+         Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
+
+         if (!m.isLargeMessage()) {
+            m.ensureScanning();
+         }
+
+         Properties properties = m.getCurrentProperties();
+
+         byte type = getType(m, properties);
+
+         rc.put(CompositeDataConstants.TYPE, type);
+
+         if (m.isLargeMessage())  {
+            rc.put(CompositeDataConstants.TEXT_BODY, "... Large message ...");
+         } else {
+            if (m.getBody() instanceof AmqpValue) {
+               Object amqpValue = ((AmqpValue) m.getBody()).getValue();
+
+               rc.put(CompositeDataConstants.TEXT_BODY, JsonUtil.truncateString(amqpValue.toString(), valueSizeLimit));

Review comment:
       An amqp-value section is allowed to and does in cases contain an encoded null. This could NPE.

##########
File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
##########
@@ -834,9 +844,64 @@ public final void receiveBuffer(ByteBuf buffer) {
          value = JsonUtil.truncate(value, valueSizeLimit);
          map.put(name.toString(), value);
       }
+
+      TypedProperties extraProperties = getExtraProperties();
+      if (extraProperties != null) {
+         extraProperties.forEach((s, o) -> {
+            map.put(s.toString(), JsonUtil.truncate(o.toString(), valueSizeLimit));
+         });
+      }
+      if (!isLargeMessage()) {
+         addAnnotationsAsProperties(map, messageAnnotations);
+      }
+
+      if (properties != null) {
+         if (properties.getContentType() != null) {
+            map.put("properties.getContentType()", properties.getContentType().toString());

Review comment:
       Seems odd using "properties.<methodNames()>" syntax to add items with explicit field names here, after using "annotation <annotation-name>" just above.

##########
File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
##########
@@ -834,9 +844,64 @@ public final void receiveBuffer(ByteBuf buffer) {
          value = JsonUtil.truncate(value, valueSizeLimit);
          map.put(name.toString(), value);
       }
+
+      TypedProperties extraProperties = getExtraProperties();
+      if (extraProperties != null) {
+         extraProperties.forEach((s, o) -> {
+            map.put(s.toString(), JsonUtil.truncate(o.toString(), valueSizeLimit));
+         });
+      }
+      if (!isLargeMessage()) {
+         addAnnotationsAsProperties(map, messageAnnotations);
+      }
+
+      if (properties != null) {
+         if (properties.getContentType() != null) {
+            map.put("properties.getContentType()", properties.getContentType().toString());
+         }
+         if (properties.getContentEncoding() != null) {
+            map.put("properties.getContentEncoding()", properties.getContentEncoding().toString());
+         }
+         if (properties.getGroupId() != null) {
+            map.put("properties.getGroupID()", properties.getGroupId());
+         }
+         if (properties.getGroupSequence() != null) {
+            map.put("properties.getGroupSequence()", properties.getGroupSequence().intValue());
+         }
+         if (properties.getReplyToGroupId() != null) {
+            map.put("properties.getReplyToGroupId()", properties.getReplyToGroupId());
+         }
+      }
+
       return map;
    }
 
+
+   protected static void addAnnotationsAsProperties(Map map, MessageAnnotations annotations) {
+      if (annotations != null && annotations.getValue() != null) {
+         for (Map.Entry<?, ?> entry : annotations.getValue().entrySet()) {
+            String key = entry.getKey().toString();
+            if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
+               long deliveryTime = ((Number) entry.getValue()).longValue();
+               map.put("annotation x-opt-delivery-time", deliveryTime);
+            } else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) {
+               long delay = ((Number) entry.getValue()).longValue();
+               if (delay > 0) {
+                  map.put("annotation x-opt-delivery-delay", System.currentTimeMillis() + delay);

Review comment:
       This seems wrong, its value is a constant and shouldn't be changing. Its certainly not indicating a delay from the time this method is called.

##########
File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
##########
@@ -1726,4 +1791,103 @@ public Object getOwner() {
    public void setOwner(Object object) {
       this.owner = object;
    }
+
+
+   // *******************************************************************************************************************************
+   // Composite Data implementation
+
+   private static MessageOpenTypeFactory AMQP_FACTORY = new AmqpMessageOpenTypeFactory();
+
+   static class AmqpMessageOpenTypeFactory extends MessageOpenTypeFactory<AMQPMessage> {
+      @Override
+      protected void init() throws OpenDataException {
+         super.init();
+         addItem(CompositeDataConstants.TEXT_BODY, CompositeDataConstants.TEXT_BODY, SimpleType.STRING);
+         addItem(CompositeDataConstants.TYPE, CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE);
+      }
+
+      @Override
+      public Map<String, Object> getFields(AMQPMessage m, int valueSizeLimit, int delivery) throws OpenDataException {
+         Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
+
+         if (!m.isLargeMessage()) {
+            m.ensureScanning();
+         }
+
+         Properties properties = m.getCurrentProperties();
+
+         byte type = getType(m, properties);
+
+         rc.put(CompositeDataConstants.TYPE, type);
+
+         if (m.isLargeMessage())  {
+            rc.put(CompositeDataConstants.TEXT_BODY, "... Large message ...");
+         } else {
+            if (m.getBody() instanceof AmqpValue) {
+               Object amqpValue = ((AmqpValue) m.getBody()).getValue();
+
+               rc.put(CompositeDataConstants.TEXT_BODY, JsonUtil.truncateString(amqpValue.toString(), valueSizeLimit));
+            } else {
+               rc.put(CompositeDataConstants.TEXT_BODY, JsonUtil.truncateString("" + m.getBody(), valueSizeLimit));
+            }
+         }
+
+         return rc;
+      }
+
+      private byte getType(AMQPMessage m, Properties properties) {
+         if (m.isLargeMessage()) {
+            return DEFAULT_TYPE;
+         }
+         byte type = BYTES_TYPE;
+
+         final Symbol contentType = properties != null ? properties.getContentType() : null;
+         final String contentTypeString = contentType != null ? contentType.toString() : null;
+
+         if (m.getBody() instanceof Data) {
+
+            if (contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) {
+               type = OBJECT_TYPE;
+            } else if (contentType.equals(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE)) {

Review comment:
       Same as above

##########
File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
##########
@@ -1726,4 +1791,103 @@ public Object getOwner() {
    public void setOwner(Object object) {
       this.owner = object;
    }
+
+
+   // *******************************************************************************************************************************
+   // Composite Data implementation
+
+   private static MessageOpenTypeFactory AMQP_FACTORY = new AmqpMessageOpenTypeFactory();
+
+   static class AmqpMessageOpenTypeFactory extends MessageOpenTypeFactory<AMQPMessage> {
+      @Override
+      protected void init() throws OpenDataException {
+         super.init();
+         addItem(CompositeDataConstants.TEXT_BODY, CompositeDataConstants.TEXT_BODY, SimpleType.STRING);
+         addItem(CompositeDataConstants.TYPE, CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE);
+      }
+
+      @Override
+      public Map<String, Object> getFields(AMQPMessage m, int valueSizeLimit, int delivery) throws OpenDataException {
+         Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
+
+         if (!m.isLargeMessage()) {
+            m.ensureScanning();
+         }
+
+         Properties properties = m.getCurrentProperties();
+
+         byte type = getType(m, properties);
+
+         rc.put(CompositeDataConstants.TYPE, type);
+
+         if (m.isLargeMessage())  {
+            rc.put(CompositeDataConstants.TEXT_BODY, "... Large message ...");
+         } else {
+            if (m.getBody() instanceof AmqpValue) {
+               Object amqpValue = ((AmqpValue) m.getBody()).getValue();
+
+               rc.put(CompositeDataConstants.TEXT_BODY, JsonUtil.truncateString(amqpValue.toString(), valueSizeLimit));
+            } else {
+               rc.put(CompositeDataConstants.TEXT_BODY, JsonUtil.truncateString("" + m.getBody(), valueSizeLimit));
+            }
+         }
+
+         return rc;
+      }
+
+      private byte getType(AMQPMessage m, Properties properties) {
+         if (m.isLargeMessage()) {
+            return DEFAULT_TYPE;
+         }
+         byte type = BYTES_TYPE;
+
+         final Symbol contentType = properties != null ? properties.getContentType() : null;
+         final String contentTypeString = contentType != null ? contentType.toString() : null;
+
+         if (m.getBody() instanceof Data) {
+
+            if (contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) {

Review comment:
       The contentType variable can be null, either as the content type wasnt set (theres no requirement it be), or you even explicitly set it null if the properties section was omitted (again no requirement it is present). NPE waiting to happen.

##########
File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
##########
@@ -834,9 +844,64 @@ public final void receiveBuffer(ByteBuf buffer) {
          value = JsonUtil.truncate(value, valueSizeLimit);
          map.put(name.toString(), value);
       }
+
+      TypedProperties extraProperties = getExtraProperties();
+      if (extraProperties != null) {
+         extraProperties.forEach((s, o) -> {
+            map.put(s.toString(), JsonUtil.truncate(o.toString(), valueSizeLimit));
+         });
+      }
+      if (!isLargeMessage()) {
+         addAnnotationsAsProperties(map, messageAnnotations);
+      }
+
+      if (properties != null) {
+         if (properties.getContentType() != null) {
+            map.put("properties.getContentType()", properties.getContentType().toString());
+         }
+         if (properties.getContentEncoding() != null) {
+            map.put("properties.getContentEncoding()", properties.getContentEncoding().toString());
+         }
+         if (properties.getGroupId() != null) {
+            map.put("properties.getGroupID()", properties.getGroupId());
+         }
+         if (properties.getGroupSequence() != null) {
+            map.put("properties.getGroupSequence()", properties.getGroupSequence().intValue());
+         }
+         if (properties.getReplyToGroupId() != null) {
+            map.put("properties.getReplyToGroupId()", properties.getReplyToGroupId());
+         }
+      }
+
       return map;
    }
 
+
+   protected static void addAnnotationsAsProperties(Map map, MessageAnnotations annotations) {
+      if (annotations != null && annotations.getValue() != null) {
+         for (Map.Entry<?, ?> entry : annotations.getValue().entrySet()) {
+            String key = entry.getKey().toString();
+            if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
+               long deliveryTime = ((Number) entry.getValue()).longValue();
+               map.put("annotation x-opt-delivery-time", deliveryTime);

Review comment:
       It would probably be clearer to say it is a message-annotation.
   
   As prior comment, seems odd to use different syntax.

##########
File path: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
##########
@@ -1290,4 +1301,88 @@ public String getStringBody() {
 
       return body;
    }
+
+
+   // *******************************************************************************************************************************
+   // Composite Data implementation
+
+   private static MessageOpenTypeFactory TEXT_FACTORY = new TextMessageOpenTypeFactory();
+   private static MessageOpenTypeFactory BYTES_FACTORY = new BytesMessageOpenTypeFactory();
+
+
+   @Override
+   public CompositeData toCompositeData(int fieldsLimit, int deliveryCount) throws OpenDataException {
+      CompositeType ct;
+      Map<String, Object> fields;
+      byte type = getType();
+      switch (type) {
+         case Message.TEXT_TYPE:
+            ct = TEXT_FACTORY.getCompositeType();
+            fields = TEXT_FACTORY.getFields(this, fieldsLimit, deliveryCount);
+            break;
+         default:
+            ct = BYTES_FACTORY.getCompositeType();
+            fields = BYTES_FACTORY.getFields(this, fieldsLimit, deliveryCount);
+            break;
+      }
+      return new CompositeDataSupport(ct, fields);
+
+   }
+
+   static class BytesMessageOpenTypeFactory extends MessageOpenTypeFactory<CoreMessage> {
+      protected ArrayType body;
+
+      @Override
+      protected void init() throws OpenDataException {
+         super.init();
+         body = new ArrayType(SimpleType.BYTE, true);
+         addItem(CompositeDataConstants.TYPE, CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE);
+         addItem(CompositeDataConstants.BODY, CompositeDataConstants.BODY_DESCRIPTION, body);
+      }
+
+      @Override
+      public Map<String, Object> getFields(CoreMessage m, int valueSizeLimit, int delivery) throws OpenDataException {
+         Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
+         rc.put(CompositeDataConstants.TYPE, m.getType());
+         if (!m.isLargeMessage()) {
+            ActiveMQBuffer bodyCopy = m.toCore().getReadOnlyBodyBuffer();
+            byte[] bytes = new byte[bodyCopy.readableBytes() <= valueSizeLimit ? bodyCopy.readableBytes() : valueSizeLimit + 1];
+            bodyCopy.readBytes(bytes);
+            rc.put(CompositeDataConstants.BODY, JsonUtil.truncate(bytes, valueSizeLimit));
+         } else {
+            rc.put(CompositeDataConstants.BODY, new byte[0]);
+         }
+         return rc;
+      }
+   }
+
+   static class TextMessageOpenTypeFactory extends MessageOpenTypeFactory<CoreMessage> {
+      @Override
+      protected void init() throws OpenDataException {
+         super.init();
+         addItem(CompositeDataConstants.TYPE, CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE);
+         addItem(CompositeDataConstants.TEXT_BODY, CompositeDataConstants.TEXT_BODY, SimpleType.STRING);
+      }
+
+      @Override
+      public Map<String, Object> getFields(CoreMessage m, int valueSizeLimit, int delivery) throws OpenDataException {
+         Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
+         rc.put(CompositeDataConstants.TYPE, m.getType());
+         if (!m.isLargeMessage()) {
+            if (m.containsProperty(Message.HDR_LARGE_COMPRESSED)) {
+               rc.put(CompositeDataConstants.TEXT_BODY, "[compressed]");
+            } else {
+               SimpleString text = m.toCore().getReadOnlyBodyBuffer().readNullableSimpleString();

Review comment:
       Unnecessary call to toCore on what is already a CoreMessage instance.

##########
File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
##########
@@ -1726,4 +1791,103 @@ public Object getOwner() {
    public void setOwner(Object object) {
       this.owner = object;
    }
+
+
+   // *******************************************************************************************************************************
+   // Composite Data implementation
+
+   private static MessageOpenTypeFactory AMQP_FACTORY = new AmqpMessageOpenTypeFactory();
+
+   static class AmqpMessageOpenTypeFactory extends MessageOpenTypeFactory<AMQPMessage> {
+      @Override
+      protected void init() throws OpenDataException {
+         super.init();
+         addItem(CompositeDataConstants.TEXT_BODY, CompositeDataConstants.TEXT_BODY, SimpleType.STRING);
+         addItem(CompositeDataConstants.TYPE, CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE);
+      }
+
+      @Override
+      public Map<String, Object> getFields(AMQPMessage m, int valueSizeLimit, int delivery) throws OpenDataException {
+         Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
+
+         if (!m.isLargeMessage()) {
+            m.ensureScanning();
+         }
+
+         Properties properties = m.getCurrentProperties();
+
+         byte type = getType(m, properties);
+
+         rc.put(CompositeDataConstants.TYPE, type);
+
+         if (m.isLargeMessage())  {
+            rc.put(CompositeDataConstants.TEXT_BODY, "... Large message ...");
+         } else {
+            if (m.getBody() instanceof AmqpValue) {
+               Object amqpValue = ((AmqpValue) m.getBody()).getValue();
+
+               rc.put(CompositeDataConstants.TEXT_BODY, JsonUtil.truncateString(amqpValue.toString(), valueSizeLimit));
+            } else {
+               rc.put(CompositeDataConstants.TEXT_BODY, JsonUtil.truncateString("" + m.getBody(), valueSizeLimit));
+            }
+         }
+
+         return rc;
+      }
+
+      private byte getType(AMQPMessage m, Properties properties) {
+         if (m.isLargeMessage()) {
+            return DEFAULT_TYPE;
+         }
+         byte type = BYTES_TYPE;
+
+         final Symbol contentType = properties != null ? properties.getContentType() : null;
+         final String contentTypeString = contentType != null ? contentType.toString() : null;
+
+         if (m.getBody() instanceof Data) {
+
+            if (contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) {
+               type = OBJECT_TYPE;
+            } else if (contentType.equals(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE)) {
+               type = BYTES_TYPE;
+            } else {
+               Charset charset = getCharsetForTextualContent(contentTypeString);
+               if (StandardCharsets.UTF_8.equals(charset)) {
+                  type = TEXT_TYPE;
+               }
+            }
+         } else if (m.getBody() instanceof AmqpSequence) {
+            type = STREAM_TYPE;
+         } else if (m.getBody() instanceof AmqpValue) {
+            Object value = ((AmqpValue) m.getBody()).getValue();
+
+            if (value instanceof String) {
+               type = TEXT_TYPE;
+            } else if (value instanceof Binary) {
+
+               if (contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) {

Review comment:
       Same as above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org