You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/26 13:19:50 UTC
[3/5] activemq-artemis git commit: ARTEMIS-2096 Refactor AMQMessage
abstraction
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
index 45ba931..e147687 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
@@ -98,23 +98,29 @@ import io.netty.buffer.PooledByteBufAllocator;
* */
public class AmqpCoreConverter {
- @SuppressWarnings("unchecked")
public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
+ return message.toCore(coreMessageObjectPools);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools coreMessageObjectPools, Header header, MessageAnnotations annotations, Properties properties, ApplicationProperties applicationProperties, Section body, Footer footer) throws Exception {
+ final long messageId = message.getMessageID();
+ final Symbol contentType = properties != null ? properties.getContentType() : null;
+ final String contentTypeString = contentType != null ? contentType.toString() : null;
- Section body = message.getProtonMessage().getBody();
ServerJMSMessage result;
if (body == null) {
- if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
- result = createObjectMessage(message.getMessageID(), coreMessageObjectPools);
- } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage()) || isContentType(null, message.getProtonMessage())) {
- result = createBytesMessage(message.getMessageID(), coreMessageObjectPools);
+ if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
+ result = createObjectMessage(messageId, coreMessageObjectPools);
+ } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, contentType) || isContentType(null, contentType)) {
+ result = createBytesMessage(messageId, coreMessageObjectPools);
} else {
- Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType());
+ Charset charset = getCharsetForTextualContent(contentTypeString);
if (charset != null) {
- result = createTextMessage(message.getMessageID(), coreMessageObjectPools);
+ result = createTextMessage(messageId, coreMessageObjectPools);
} else {
- result = createMessage(message.getMessageID(), coreMessageObjectPools);
+ result = createMessage(messageId, coreMessageObjectPools);
}
}
@@ -122,30 +128,30 @@ public class AmqpCoreConverter {
} else if (body instanceof Data) {
Binary payload = ((Data) body).getValue();
- if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
- result = createObjectMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
- } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage())) {
- result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
+ if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
+ result = createObjectMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
+ } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, contentType)) {
+ result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
} else {
- Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType());
+ Charset charset = getCharsetForTextualContent(contentTypeString);
if (StandardCharsets.UTF_8.equals(charset)) {
ByteBuffer buf = ByteBuffer.wrap(payload.getArray(), payload.getArrayOffset(), payload.getLength());
try {
CharBuffer chars = charset.newDecoder().decode(buf);
- result = createTextMessage(message.getMessageID(), String.valueOf(chars), coreMessageObjectPools);
+ result = createTextMessage(messageId, String.valueOf(chars), coreMessageObjectPools);
} catch (CharacterCodingException e) {
- result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
+ result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
}
} else {
- result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
+ result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
}
}
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
} else if (body instanceof AmqpSequence) {
AmqpSequence sequence = (AmqpSequence) body;
- ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools);
+ ServerJMSStreamMessage m = createStreamMessage(messageId, coreMessageObjectPools);
for (Object item : sequence.getValue()) {
m.writeObject(item);
}
@@ -155,35 +161,35 @@ public class AmqpCoreConverter {
} else if (body instanceof AmqpValue) {
Object value = ((AmqpValue) body).getValue();
if (value == null || value instanceof String) {
- result = createTextMessage(message.getMessageID(), (String) value, coreMessageObjectPools);
+ result = createTextMessage(messageId, (String) value, coreMessageObjectPools);
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING);
} else if (value instanceof Binary) {
Binary payload = (Binary) value;
- if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
- result = createObjectMessage(message.getMessageID(), payload, coreMessageObjectPools);
+ if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
+ result = createObjectMessage(messageId, payload, coreMessageObjectPools);
} else {
- result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
+ result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
}
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
} else if (value instanceof List) {
- ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools);
+ ServerJMSStreamMessage m = createStreamMessage(messageId, coreMessageObjectPools);
for (Object item : (List<Object>) value) {
m.writeObject(item);
}
result = m;
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_LIST);
} else if (value instanceof Map) {
- result = createMapMessage(message.getMessageID(), (Map<String, Object>) value, coreMessageObjectPools);
+ result = createMapMessage(messageId, (Map<String, Object>) value, coreMessageObjectPools);
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_MAP);
} else {
ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
try {
TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf));
TLSEncode.getEncoder().writeObject(body);
- result = createBytesMessage(message.getMessageID(), buf.array(), 0, buf.writerIndex(), coreMessageObjectPools);
+ result = createBytesMessage(messageId, buf.array(), 0, buf.writerIndex(), coreMessageObjectPools);
} finally {
buf.release();
TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null);
@@ -193,30 +199,38 @@ public class AmqpCoreConverter {
throw new RuntimeException("Unexpected body type: " + body.getClass());
}
- TypedProperties properties = message.getExtraProperties();
- if (properties != null) {
- for (SimpleString str : properties.getPropertyNames()) {
- if (str.equals(AMQPMessage.ADDRESS_PROPERTY)) {
- continue;
- }
- result.getInnerMessage().putObjectProperty(str, properties.getProperty(str));
+ processHeader(result, header);
+ processMessageAnnotations(result, annotations);
+ processApplicationProperties(result, applicationProperties);
+ processProperties(result, properties);
+ processFooter(result, footer);
+ processExtraProperties(result, message.getExtraProperties());
+
+ // If the JMS expiration has not yet been set...
+ if (header != null && result.getJMSExpiration() == 0) {
+ // Then lets try to set it based on the message TTL.
+ long ttl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
+ if (header.getTtl() != null) {
+ ttl = header.getTtl().longValue();
+ }
+
+ if (ttl == 0) {
+ result.setJMSExpiration(0);
+ } else {
+ result.setJMSExpiration(System.currentTimeMillis() + ttl);
}
}
- populateMessage(result, message.getProtonMessage());
result.getInnerMessage().setReplyTo(message.getReplyTo());
result.getInnerMessage().setDurable(message.isDurable());
result.getInnerMessage().setPriority(message.getPriority());
result.getInnerMessage().setAddress(message.getAddressSimpleString());
-
result.encode();
- return result != null ? result.getInnerMessage() : null;
+ return result.getInnerMessage();
}
- @SuppressWarnings("unchecked")
- protected static ServerJMSMessage populateMessage(ServerJMSMessage jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
- Header header = amqp.getHeader();
+ protected static ServerJMSMessage processHeader(ServerJMSMessage jms, Header header) throws Exception {
if (header != null) {
jms.setBooleanProperty(JMS_AMQP_HEADER, true);
@@ -248,9 +262,12 @@ public class AmqpCoreConverter {
jms.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
- final MessageAnnotations ma = amqp.getMessageAnnotations();
- if (ma != null) {
- for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
+ return jms;
+ }
+
+ protected static ServerJMSMessage processMessageAnnotations(ServerJMSMessage jms, MessageAnnotations annotations) throws Exception {
+ if (annotations != null && annotations.getValue() != null) {
+ for (Map.Entry<?, ?> entry : annotations.getValue().entrySet()) {
String key = entry.getKey().toString();
if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
long deliveryTime = ((Number) entry.getValue()).longValue();
@@ -266,14 +283,33 @@ public class AmqpCoreConverter {
}
}
- final ApplicationProperties ap = amqp.getApplicationProperties();
- if (ap != null) {
- for (Map.Entry<String, Object> entry : (Set<Map.Entry<String, Object>>) ap.getValue().entrySet()) {
+ return jms;
+ }
+
+ private static ServerJMSMessage processApplicationProperties(ServerJMSMessage jms, ApplicationProperties properties) throws Exception {
+ if (properties != null && properties.getValue() != null) {
+ for (Map.Entry<String, Object> entry : (Set<Map.Entry<String, Object>>) properties.getValue().entrySet()) {
setProperty(jms, entry.getKey(), entry.getValue());
}
}
- final Properties properties = amqp.getProperties();
+ return jms;
+ }
+
+ private static ServerJMSMessage processExtraProperties(ServerJMSMessage jms, TypedProperties properties) {
+ if (properties != null) {
+ for (SimpleString str : properties.getPropertyNames()) {
+ if (str.equals(AMQPMessage.ADDRESS_PROPERTY)) {
+ continue;
+ }
+ jms.getInnerMessage().putObjectProperty(str, properties.getProperty(str));
+ }
+ }
+
+ return jms;
+ }
+
+ private static ServerJMSMessage processProperties(ServerJMSMessage jms, Properties properties) throws Exception {
if (properties != null) {
if (properties.getMessageId() != null) {
jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toMessageIdString(properties.getMessageId()));
@@ -317,24 +353,13 @@ public class AmqpCoreConverter {
}
}
- // If the jms expiration has not yet been set...
- if (header != null && jms.getJMSExpiration() == 0) {
- // Then lets try to set it based on the message ttl.
- long ttl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
- if (header.getTtl() != null) {
- ttl = header.getTtl().longValue();
- }
-
- if (ttl == 0) {
- jms.setJMSExpiration(0);
- } else {
- jms.setJMSExpiration(System.currentTimeMillis() + ttl);
- }
- }
+ return jms;
+ }
- final Footer fp = amqp.getFooter();
- if (fp != null) {
- for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) fp.getValue().entrySet()) {
+ @SuppressWarnings("unchecked")
+ private static ServerJMSMessage processFooter(ServerJMSMessage jms, Footer footer) throws Exception {
+ if (footer != null && footer.getValue() != null) {
+ for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) footer.getValue().entrySet()) {
String key = entry.getKey().toString();
setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue());
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
index 940a746..e929406 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
@@ -139,5 +139,4 @@ public class AmqpSupport {
return null;
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java
index bf46e81..d752bd7 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java
@@ -117,11 +117,9 @@ public class NettyWritable implements WritableBuffer {
@Override
public void put(ReadableBuffer buffer) {
if (buffer.hasArray()) {
- nettyBuffer.writeBytes(buffer.array(), buffer.arrayOffset(), buffer.remaining());
+ nettyBuffer.writeBytes(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
} else {
- while (buffer.hasRemaining()) {
- nettyBuffer.writeByte(buffer.get());
- }
+ nettyBuffer.writeBytes(buffer.byteBuffer());
}
}
}