You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/26 13:19:50 UTC

[3/5] activemq-artemis git commit: ARTEMIS-2096 Refactor AMQMessage abstraction

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
index 45ba931..e147687 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
@@ -98,23 +98,29 @@ import io.netty.buffer.PooledByteBufAllocator;
  * */
 public class AmqpCoreConverter {
 
-   @SuppressWarnings("unchecked")
    public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
+      return message.toCore(coreMessageObjectPools);
+   }
+
+   @SuppressWarnings("unchecked")
+   public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools coreMessageObjectPools, Header header, MessageAnnotations annotations, Properties properties, ApplicationProperties applicationProperties, Section body, Footer footer) throws Exception {
+      final long messageId = message.getMessageID();
+      final Symbol contentType = properties != null ? properties.getContentType() : null;
+      final String contentTypeString = contentType != null ? contentType.toString() : null;
 
-      Section body = message.getProtonMessage().getBody();
       ServerJMSMessage result;
 
       if (body == null) {
-         if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
-            result = createObjectMessage(message.getMessageID(), coreMessageObjectPools);
-         } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage()) || isContentType(null, message.getProtonMessage())) {
-            result = createBytesMessage(message.getMessageID(), coreMessageObjectPools);
+         if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
+            result = createObjectMessage(messageId, coreMessageObjectPools);
+         } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, contentType) || isContentType(null, contentType)) {
+            result = createBytesMessage(messageId, coreMessageObjectPools);
          } else {
-            Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType());
+            Charset charset = getCharsetForTextualContent(contentTypeString);
             if (charset != null) {
-               result = createTextMessage(message.getMessageID(), coreMessageObjectPools);
+               result = createTextMessage(messageId, coreMessageObjectPools);
             } else {
-               result = createMessage(message.getMessageID(), coreMessageObjectPools);
+               result = createMessage(messageId, coreMessageObjectPools);
             }
          }
 
@@ -122,30 +128,30 @@ public class AmqpCoreConverter {
       } else if (body instanceof Data) {
          Binary payload = ((Data) body).getValue();
 
-         if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
-            result = createObjectMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
-         } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage())) {
-            result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
+         if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
+            result = createObjectMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
+         } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, contentType)) {
+            result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
          } else {
-            Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType());
+            Charset charset = getCharsetForTextualContent(contentTypeString);
             if (StandardCharsets.UTF_8.equals(charset)) {
                ByteBuffer buf = ByteBuffer.wrap(payload.getArray(), payload.getArrayOffset(), payload.getLength());
 
                try {
                   CharBuffer chars = charset.newDecoder().decode(buf);
-                  result = createTextMessage(message.getMessageID(), String.valueOf(chars), coreMessageObjectPools);
+                  result = createTextMessage(messageId, String.valueOf(chars), coreMessageObjectPools);
                } catch (CharacterCodingException e) {
-                  result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
+                  result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
                }
             } else {
-               result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
+               result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
             }
          }
 
          result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
       } else if (body instanceof AmqpSequence) {
          AmqpSequence sequence = (AmqpSequence) body;
-         ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools);
+         ServerJMSStreamMessage m = createStreamMessage(messageId, coreMessageObjectPools);
          for (Object item : sequence.getValue()) {
             m.writeObject(item);
          }
@@ -155,35 +161,35 @@ public class AmqpCoreConverter {
       } else if (body instanceof AmqpValue) {
          Object value = ((AmqpValue) body).getValue();
          if (value == null || value instanceof String) {
-            result = createTextMessage(message.getMessageID(), (String) value, coreMessageObjectPools);
+            result = createTextMessage(messageId, (String) value, coreMessageObjectPools);
 
             result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING);
          } else if (value instanceof Binary) {
             Binary payload = (Binary) value;
 
-            if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
-               result = createObjectMessage(message.getMessageID(), payload, coreMessageObjectPools);
+            if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
+               result = createObjectMessage(messageId, payload, coreMessageObjectPools);
             } else {
-               result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
+               result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
             }
 
             result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
          } else if (value instanceof List) {
-            ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools);
+            ServerJMSStreamMessage m = createStreamMessage(messageId, coreMessageObjectPools);
             for (Object item : (List<Object>) value) {
                m.writeObject(item);
             }
             result = m;
             result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_LIST);
          } else if (value instanceof Map) {
-            result = createMapMessage(message.getMessageID(), (Map<String, Object>) value, coreMessageObjectPools);
+            result = createMapMessage(messageId, (Map<String, Object>) value, coreMessageObjectPools);
             result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_MAP);
          } else {
             ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
             try {
                TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf));
                TLSEncode.getEncoder().writeObject(body);
-               result = createBytesMessage(message.getMessageID(), buf.array(), 0, buf.writerIndex(), coreMessageObjectPools);
+               result = createBytesMessage(messageId, buf.array(), 0, buf.writerIndex(), coreMessageObjectPools);
             } finally {
                buf.release();
                TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null);
@@ -193,30 +199,38 @@ public class AmqpCoreConverter {
          throw new RuntimeException("Unexpected body type: " + body.getClass());
       }
 
-      TypedProperties properties = message.getExtraProperties();
-      if (properties != null) {
-         for (SimpleString str : properties.getPropertyNames()) {
-            if (str.equals(AMQPMessage.ADDRESS_PROPERTY)) {
-               continue;
-            }
-            result.getInnerMessage().putObjectProperty(str, properties.getProperty(str));
+      processHeader(result, header);
+      processMessageAnnotations(result, annotations);
+      processApplicationProperties(result, applicationProperties);
+      processProperties(result, properties);
+      processFooter(result, footer);
+      processExtraProperties(result, message.getExtraProperties());
+
+      // If the JMS expiration has not yet been set...
+      if (header != null && result.getJMSExpiration() == 0) {
+         // Then lets try to set it based on the message TTL.
+         long ttl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
+         if (header.getTtl() != null) {
+            ttl = header.getTtl().longValue();
+         }
+
+         if (ttl == 0) {
+            result.setJMSExpiration(0);
+         } else {
+            result.setJMSExpiration(System.currentTimeMillis() + ttl);
          }
       }
 
-      populateMessage(result, message.getProtonMessage());
       result.getInnerMessage().setReplyTo(message.getReplyTo());
       result.getInnerMessage().setDurable(message.isDurable());
       result.getInnerMessage().setPriority(message.getPriority());
       result.getInnerMessage().setAddress(message.getAddressSimpleString());
-
       result.encode();
 
-      return result != null ? result.getInnerMessage() : null;
+      return result.getInnerMessage();
    }
 
-   @SuppressWarnings("unchecked")
-   protected static ServerJMSMessage populateMessage(ServerJMSMessage jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
-      Header header = amqp.getHeader();
+   protected static ServerJMSMessage processHeader(ServerJMSMessage jms, Header header) throws Exception {
       if (header != null) {
          jms.setBooleanProperty(JMS_AMQP_HEADER, true);
 
@@ -248,9 +262,12 @@ public class AmqpCoreConverter {
          jms.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
       }
 
-      final MessageAnnotations ma = amqp.getMessageAnnotations();
-      if (ma != null) {
-         for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
+      return jms;
+   }
+
+   protected static ServerJMSMessage processMessageAnnotations(ServerJMSMessage jms, MessageAnnotations annotations) throws Exception {
+      if (annotations != null && annotations.getValue() != null) {
+         for (Map.Entry<?, ?> entry : annotations.getValue().entrySet()) {
             String key = entry.getKey().toString();
             if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
                long deliveryTime = ((Number) entry.getValue()).longValue();
@@ -266,14 +283,33 @@ public class AmqpCoreConverter {
          }
       }
 
-      final ApplicationProperties ap = amqp.getApplicationProperties();
-      if (ap != null) {
-         for (Map.Entry<String, Object> entry : (Set<Map.Entry<String, Object>>) ap.getValue().entrySet()) {
+      return jms;
+   }
+
+   private static ServerJMSMessage processApplicationProperties(ServerJMSMessage jms, ApplicationProperties properties) throws Exception {
+      if (properties != null && properties.getValue() != null) {
+         for (Map.Entry<String, Object> entry : (Set<Map.Entry<String, Object>>) properties.getValue().entrySet()) {
             setProperty(jms, entry.getKey(), entry.getValue());
          }
       }
 
-      final Properties properties = amqp.getProperties();
+      return jms;
+   }
+
+   private static ServerJMSMessage processExtraProperties(ServerJMSMessage jms, TypedProperties properties) {
+      if (properties != null) {
+         for (SimpleString str : properties.getPropertyNames()) {
+            if (str.equals(AMQPMessage.ADDRESS_PROPERTY)) {
+               continue;
+            }
+            jms.getInnerMessage().putObjectProperty(str, properties.getProperty(str));
+         }
+      }
+
+      return jms;
+   }
+
+   private static ServerJMSMessage processProperties(ServerJMSMessage jms, Properties properties) throws Exception {
       if (properties != null) {
          if (properties.getMessageId() != null) {
             jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toMessageIdString(properties.getMessageId()));
@@ -317,24 +353,13 @@ public class AmqpCoreConverter {
          }
       }
 
-      // If the jms expiration has not yet been set...
-      if (header != null && jms.getJMSExpiration() == 0) {
-         // Then lets try to set it based on the message ttl.
-         long ttl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
-         if (header.getTtl() != null) {
-            ttl = header.getTtl().longValue();
-         }
-
-         if (ttl == 0) {
-            jms.setJMSExpiration(0);
-         } else {
-            jms.setJMSExpiration(System.currentTimeMillis() + ttl);
-         }
-      }
+      return jms;
+   }
 
-      final Footer fp = amqp.getFooter();
-      if (fp != null) {
-         for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) fp.getValue().entrySet()) {
+   @SuppressWarnings("unchecked")
+   private static ServerJMSMessage processFooter(ServerJMSMessage jms, Footer footer) throws Exception {
+      if (footer != null && footer.getValue() != null) {
+         for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) footer.getValue().entrySet()) {
             String key = entry.getKey().toString();
             setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue());
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
index 940a746..e929406 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
@@ -139,5 +139,4 @@ public class AmqpSupport {
 
       return null;
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java
index bf46e81..d752bd7 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java
@@ -117,11 +117,9 @@ public class NettyWritable implements WritableBuffer {
    @Override
    public void put(ReadableBuffer buffer) {
       if (buffer.hasArray()) {
-         nettyBuffer.writeBytes(buffer.array(), buffer.arrayOffset(), buffer.remaining());
+         nettyBuffer.writeBytes(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
       } else {
-         while (buffer.hasRemaining()) {
-            nettyBuffer.writeByte(buffer.get());
-         }
+         nettyBuffer.writeBytes(buffer.byteBuffer());
       }
    }
 }