You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/10/07 14:51:04 UTC
[3/5] activemq-artemis git commit: ARTEMIS-770 AMQP Message
Transformer refactor
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java
index 9dd29ab..629c499 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java
@@ -1,13 +1,13 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,27 +16,50 @@
*/
package org.apache.activemq.artemis.protocol.amqp.converter.message;
-import javax.jms.BytesMessage;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
-import java.io.Serializable;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_LIST;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_MAP;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_NULL;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_STRING;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_FORMAT;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createBytesMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createMapMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createObjectMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createStreamMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createTextMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.getCharsetForTextualContent;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.isContentType;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
+import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.message.Message;
public class JMSMappingInboundTransformer extends InboundTransformer {
- public JMSMappingInboundTransformer(JMSVendor vendor) {
- super(vendor);
+ public JMSMappingInboundTransformer(IDGenerator idGenerator) {
+ super(idGenerator);
}
@Override
@@ -46,75 +69,128 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
@Override
public InboundTransformer getFallbackTransformer() {
- return new AMQPNativeInboundTransformer(getVendor());
+ return new AMQPNativeInboundTransformer(idGenerator);
}
- @SuppressWarnings({"unchecked"})
@Override
- public Message transform(EncodedMessage amqpMessage) throws Exception {
- org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
+ public ServerJMSMessage transform(EncodedMessage encodedMessage) throws Exception {
+ ServerJMSMessage transformedMessage = null;
+
+ try {
+ Message amqpMessage = encodedMessage.decode();
+ transformedMessage = createServerMessage(amqpMessage);
+ populateMessage(transformedMessage, amqpMessage);
+ } catch (Exception ex) {
+ InboundTransformer transformer = this.getFallbackTransformer();
+
+ while (transformer != null) {
+ try {
+ transformedMessage = transformer.transform(encodedMessage);
+ break;
+ } catch (Exception e) {
+ transformer = transformer.getFallbackTransformer();
+ }
+ }
+ }
+
+ // Regardless of the transformer that finally decoded the message we need to ensure that
+ // the AMQP Message Format value is preserved for application on retransmit.
+ if (transformedMessage != null && encodedMessage.getMessageFormat() != 0) {
+ transformedMessage.setLongProperty(JMS_AMQP_MESSAGE_FORMAT, encodedMessage.getMessageFormat());
+ }
+
+ return transformedMessage;
+ }
+
+ @SuppressWarnings("unchecked")
+ private ServerJMSMessage createServerMessage(Message message) throws Exception {
+
+ Section body = message.getBody();
+ ServerJMSMessage result;
- Message rc;
- final Section body = amqp.getBody();
if (body == null) {
- rc = vendor.createMessage();
+ if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
+ result = createObjectMessage(idGenerator);
+ } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message) || isContentType(null, message)) {
+ result = createBytesMessage(idGenerator);
+ } else {
+ Charset charset = getCharsetForTextualContent(message.getContentType());
+ if (charset != null) {
+ result = createTextMessage(idGenerator);
+ } else {
+ result = createMessage(idGenerator);
+ }
+ }
+
+ result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL);
} else if (body instanceof Data) {
- Binary d = ((Data) body).getValue();
- BytesMessage m = vendor.createBytesMessage();
- m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
- rc = m;
+ Binary payload = ((Data) body).getValue();
+
+ if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
+ result = createObjectMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength());
+ } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message)) {
+ result = createBytesMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength());
+ } else {
+ Charset charset = getCharsetForTextualContent(message.getContentType());
+ if (StandardCharsets.UTF_8.equals(charset)) {
+ ByteBuffer buf = ByteBuffer.wrap(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+
+ try {
+ CharBuffer chars = charset.newDecoder().decode(buf);
+ result = createTextMessage(idGenerator, String.valueOf(chars));
+ } catch (CharacterCodingException e) {
+ result = createBytesMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength());
+ }
+ } else {
+ result = createBytesMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength());
+ }
+ }
+
+ result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
} else if (body instanceof AmqpSequence) {
AmqpSequence sequence = (AmqpSequence) body;
- StreamMessage m = vendor.createStreamMessage();
+ ServerJMSStreamMessage m = createStreamMessage(idGenerator);
for (Object item : sequence.getValue()) {
m.writeObject(item);
}
- rc = m;
- m.setStringProperty(AMQPMessageTypes.AMQP_TYPE_KEY, AMQPMessageTypes.AMQP_SEQUENCE);
+
+ result = m;
+ result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE);
} else if (body instanceof AmqpValue) {
Object value = ((AmqpValue) body).getValue();
- if (value == null) {
- rc = vendor.createObjectMessage();
- }
- if (value instanceof String) {
- TextMessage m = vendor.createTextMessage();
- m.setText((String) value);
- rc = m;
+ if (value == null || value instanceof String) {
+ result = createTextMessage(idGenerator, (String) value);
+
+ result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING);
} else if (value instanceof Binary) {
- Binary d = (Binary) value;
- BytesMessage m = vendor.createBytesMessage();
- m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
- rc = m;
+ Binary payload = (Binary) value;
+
+ if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
+ result = createObjectMessage(idGenerator, payload);
+ } else {
+ result = createBytesMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength());
+ }
+
+ result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
} else if (value instanceof List) {
- StreamMessage m = vendor.createStreamMessage();
+ ServerJMSStreamMessage m = createStreamMessage(idGenerator);
for (Object item : (List<Object>) value) {
m.writeObject(item);
}
- rc = m;
- m.setStringProperty(AMQPMessageTypes.AMQP_TYPE_KEY, AMQPMessageTypes.AMQP_LIST);
+ result = m;
+ result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_LIST);
} else if (value instanceof Map) {
- MapMessage m = vendor.createMapMessage();
- final Set<Map.Entry<String, Object>> set = ((Map<String, Object>) value).entrySet();
- for (Map.Entry<String, Object> entry : set) {
- m.setObject(entry.getKey(), entry.getValue());
- }
- rc = m;
+ result = createMapMessage(idGenerator, (Map<String, Object>) value);
+ result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_MAP);
} else {
- ObjectMessage m = vendor.createObjectMessage();
- m.setObject((Serializable) value);
- rc = m;
+ // Trigger fall-back to native encoder which generates BytesMessage with the
+ // original message stored in the message body.
+ throw new ActiveMQAMQPInternalErrorException("Unable to encode to ActiveMQ JMS Message");
}
} else {
throw new RuntimeException("Unexpected body type: " + body.getClass());
}
- rc.setJMSDeliveryMode(defaultDeliveryMode);
- rc.setJMSPriority(defaultPriority);
- rc.setJMSExpiration(defaultTtl);
-
- populateMessage(rc, amqp);
- rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
- rc.setBooleanProperty(prefixVendor + "NATIVE", false);
- return rc;
+ return result;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
index 9f28a6b..9ee0344 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
@@ -1,13 +1,13 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,30 +16,61 @@
*/
package org.apache.activemq.artemis.protocol.amqp.converter.message;
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
+import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_UNKNOWN;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_LIST;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_STRING;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.EMPTY_BINARY;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_DELIVERY_ANNOTATION_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_FORMAT;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_PROPERTIES;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.toAddress;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
import javax.jms.Destination;
import javax.jms.JMSException;
-import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageEOFException;
-import javax.jms.ObjectMessage;
import javax.jms.Queue;
-import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
-import java.io.UnsupportedEncodingException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.Enumeration;
-import java.util.HashMap;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
+import org.apache.activemq.artemis.reader.MessageUtil;
+import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte;
@@ -54,12 +85,16 @@ import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.message.ProtonJMessage;
+import org.apache.qpid.proton.codec.AMQPDefinedTypes;
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.WritableBuffer;
import org.jboss.logging.Logger;
public class JMSMappingOutboundTransformer extends OutboundTransformer {
private static final Logger logger = Logger.getLogger(JMSMappingOutboundTransformer.class);
+
public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-dest");
public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-reply-to");
@@ -68,227 +103,458 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
public static final byte TEMP_QUEUE_TYPE = 0x02;
public static final byte TEMP_TOPIC_TYPE = 0x03;
- public JMSMappingOutboundTransformer(JMSVendor vendor) {
- super(vendor);
+ // For now Proton requires that we create a decoder to create an encoder
+ private static class EncoderDecoderPair {
+ DecoderImpl decoder = new DecoderImpl();
+ EncoderImpl encoder = new EncoderImpl(decoder);
+ {
+ AMQPDefinedTypes.registerAllTypes(decoder, encoder);
+ }
}
- /**
- * Perform the conversion between JMS Message and Proton Message without
- * re-encoding it to array. This is needed because some frameworks may elect
- * to do this on their own way (Netty for instance using Nettybuffers)
- *
- * @param msg
- * @return
- * @throws Exception
- */
- public ProtonJMessage convert(Message msg) throws JMSException, UnsupportedEncodingException {
- Header header = new Header();
- Properties props = new Properties();
- HashMap<Symbol, Object> daMap = null;
- HashMap<Symbol, Object> maMap = null;
- HashMap apMap = null;
- Section body = null;
- HashMap footerMap = null;
- if (msg instanceof BytesMessage) {
- BytesMessage m = (BytesMessage) msg;
- byte[] data = new byte[(int) m.getBodyLength()];
- m.readBytes(data);
- m.reset(); // Need to reset after readBytes or future readBytes
- // calls (ex: redeliveries) will fail and return -1
- body = new Data(new Binary(data));
- }
- if (msg instanceof TextMessage) {
- body = new AmqpValue(((TextMessage) msg).getText());
- }
- if (msg instanceof MapMessage) {
- final HashMap<String, Object> map = new HashMap<>();
- final MapMessage m = (MapMessage) msg;
- final Enumeration<String> names = m.getMapNames();
- while (names.hasMoreElements()) {
- String key = names.nextElement();
- map.put(key, m.getObject(key));
- }
- body = new AmqpValue(map);
+ private static final ThreadLocal<EncoderDecoderPair> tlsCodec = new ThreadLocal<EncoderDecoderPair>() {
+ @Override
+ protected EncoderDecoderPair initialValue() {
+ return new EncoderDecoderPair();
}
- if (msg instanceof StreamMessage) {
- ArrayList<Object> list = new ArrayList<>();
- final StreamMessage m = (StreamMessage) msg;
- try {
- while (true) {
- list.add(m.readObject());
- }
- } catch (MessageEOFException e) {
- }
+ };
- String amqpType = msg.getStringProperty(AMQPMessageTypes.AMQP_TYPE_KEY);
- if (amqpType.equals(AMQPMessageTypes.AMQP_LIST)) {
- body = new AmqpValue(list);
- } else {
- body = new AmqpSequence(list);
- }
- }
- if (msg instanceof ObjectMessage) {
- body = new AmqpValue(((ObjectMessage) msg).getObject());
+ public JMSMappingOutboundTransformer(IDGenerator idGenerator) {
+ super(idGenerator);
+ }
+
+ @Override
+ public long transform(ServerJMSMessage message, WritableBuffer buffer) throws JMSException, UnsupportedEncodingException {
+ if (message == null) {
+ return 0;
}
- if (body == null && msg instanceof ServerJMSMessage) {
+ long messageFormat = 0;
+ Header header = null;
+ Properties properties = null;
+ Map<Symbol, Object> daMap = null;
+ Map<Symbol, Object> maMap = null;
+ Map<String, Object> apMap = null;
+ Map<Object, Object> footerMap = null;
- MessageInternal internalMessage = ((ServerJMSMessage) msg).getInnerMessage();
- if (!internalMessage.containsProperty("AMQP_MESSAGE_FORMAT")) {
- int readerIndex = internalMessage.getBodyBuffer().readerIndex();
- try {
- Object s = internalMessage.getBodyBuffer().readNullableSimpleString();
- if (s != null) {
- body = new AmqpValue(s.toString());
- }
- } catch (Throwable ignored) {
- logger.debug("Exception ignored during conversion, should be ok!", ignored.getMessage(), ignored);
- } finally {
- internalMessage.getBodyBuffer().readerIndex(readerIndex);
- }
+ Section body = convertBody(message);
+
+ if (message.getInnerMessage().isDurable()) {
+ if (header == null) {
+ header = new Header();
}
+ header.setDurable(true);
}
-
- header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
- header.setPriority(new UnsignedByte((byte) msg.getJMSPriority()));
- if (msg.getJMSType() != null) {
- props.setSubject(msg.getJMSType());
+ byte priority = (byte) message.getJMSPriority();
+ if (priority != Message.DEFAULT_PRIORITY) {
+ if (header == null) {
+ header = new Header();
+ }
+ header.setPriority(UnsignedByte.valueOf(priority));
}
- if (msg.getJMSMessageID() != null) {
-
- String msgId = msg.getJMSMessageID();
-
+ String type = message.getJMSType();
+ if (type != null) {
+ if (properties == null) {
+ properties = new Properties();
+ }
+ properties.setSubject(type);
+ }
+ String messageId = message.getJMSMessageID();
+ if (messageId != null) {
+ if (properties == null) {
+ properties = new Properties();
+ }
try {
- props.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(msgId));
+ properties.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(messageId));
} catch (ActiveMQAMQPIllegalStateException e) {
- props.setMessageId(msgId);
+ properties.setMessageId(messageId);
}
}
- if (msg.getJMSDestination() != null) {
- props.setTo(vendor.toAddress(msg.getJMSDestination()));
+ Destination destination = message.getJMSDestination();
+ if (destination != null) {
+ if (properties == null) {
+ properties = new Properties();
+ }
+ properties.setTo(toAddress(destination));
if (maMap == null) {
- maMap = new HashMap<>();
+ maMap = new HashMap<Symbol, Object>();
}
- maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSDestination()));
+ maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination));
}
- if (msg.getJMSReplyTo() != null) {
- props.setReplyTo(vendor.toAddress(msg.getJMSReplyTo()));
+ Destination replyTo = message.getJMSReplyTo();
+ if (replyTo != null) {
+ if (properties == null) {
+ properties = new Properties();
+ }
+ properties.setReplyTo(toAddress(replyTo));
if (maMap == null) {
- maMap = new HashMap<>();
+ maMap = new HashMap<Symbol, Object>();
}
- maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSReplyTo()));
+ maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo));
}
- if (msg.getJMSCorrelationID() != null) {
- String correlationId = msg.getJMSCorrelationID();
-
+ String correlationId = message.getJMSCorrelationID();
+ if (correlationId != null) {
+ if (properties == null) {
+ properties = new Properties();
+ }
try {
- props.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId));
+ properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId));
} catch (ActiveMQAMQPIllegalStateException e) {
- props.setCorrelationId(correlationId);
+ properties.setCorrelationId(correlationId);
}
}
- if (msg.getJMSExpiration() != 0) {
- long ttl = msg.getJMSExpiration() - System.currentTimeMillis();
+ long expiration = message.getJMSExpiration();
+ if (expiration != 0) {
+ long ttl = expiration - System.currentTimeMillis();
if (ttl < 0) {
ttl = 1;
}
+
+ if (header == null) {
+ header = new Header();
+ }
header.setTtl(new UnsignedInteger((int) ttl));
- props.setAbsoluteExpiryTime(new Date(msg.getJMSExpiration()));
+ if (properties == null) {
+ properties = new Properties();
+ }
+ properties.setAbsoluteExpiryTime(new Date(expiration));
}
- if (msg.getJMSTimestamp() != 0) {
- props.setCreationTime(new Date(msg.getJMSTimestamp()));
+ long timeStamp = message.getJMSTimestamp();
+ if (timeStamp != 0) {
+ if (properties == null) {
+ properties = new Properties();
+ }
+ properties.setCreationTime(new Date(timeStamp));
}
- final Enumeration<String> keys = msg.getPropertyNames();
- while (keys.hasMoreElements()) {
- String key = keys.nextElement();
- if (key.equals(messageFormatKey) || key.equals(nativeKey) || key.equals(ServerJMSMessage.NATIVE_MESSAGE_ID)) {
- // skip..
- } else if (key.equals(firstAcquirerKey)) {
- header.setFirstAcquirer(msg.getBooleanProperty(key));
- } else if (key.startsWith("JMSXDeliveryCount")) {
- // The AMQP delivery-count field only includes prior failed delivery attempts,
- // whereas JMSXDeliveryCount includes the first/current delivery attempt.
- int amqpDeliveryCount = msg.getIntProperty(key) - 1;
- if (amqpDeliveryCount > 0) {
- header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
- }
- } else if (key.startsWith("JMSXUserID")) {
- String value = msg.getStringProperty(key);
- props.setUserId(new Binary(value.getBytes(StandardCharsets.UTF_8)));
- } else if (key.startsWith("JMSXGroupID") || key.startsWith("_AMQ_GROUP_ID")) {
- String value = msg.getStringProperty(key);
- props.setGroupId(value);
- if (apMap == null) {
- apMap = new HashMap();
- }
- apMap.put(key, value);
- } else if (key.startsWith("JMSXGroupSeq")) {
- UnsignedInteger value = new UnsignedInteger(msg.getIntProperty(key));
- props.setGroupSequence(value);
- if (apMap == null) {
- apMap = new HashMap();
- }
- apMap.put(key, value);
- } else if (key.startsWith(prefixDeliveryAnnotationsKey)) {
- if (daMap == null) {
- daMap = new HashMap<>();
+ final Set<String> keySet = MessageUtil.getPropertyNames(message.getInnerMessage());
+ for (String key : keySet) {
+ if (key.startsWith("JMSX")) {
+ if (key.equals("JMSXDeliveryCount")) {
+ // The AMQP delivery-count field only includes prior failed delivery attempts,
+ // whereas JMSXDeliveryCount includes the first/current delivery attempt.
+ int amqpDeliveryCount = message.getDeliveryCount() - 1;
+ if (amqpDeliveryCount > 0) {
+ if (header == null) {
+ header = new Header();
+ }
+ header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
+ }
+ continue;
+ } else if (key.equals("JMSXUserID")) {
+ String value = message.getStringProperty(key);
+ if (properties == null) {
+ properties = new Properties();
+ }
+ properties.setUserId(new Binary(value.getBytes(StandardCharsets.UTF_8)));
+ continue;
+ } else if (key.equals("JMSXGroupID")) {
+ String value = message.getStringProperty(key);
+ if (properties == null) {
+ properties = new Properties();
+ }
+ properties.setGroupId(value);
+ continue;
+ } else if (key.equals("JMSXGroupSeq")) {
+ UnsignedInteger value = new UnsignedInteger(message.getIntProperty(key));
+ if (properties == null) {
+ properties = new Properties();
+ }
+ properties.setGroupSequence(value);
+ continue;
}
- String name = key.substring(prefixDeliveryAnnotationsKey.length());
- daMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
- } else if (key.startsWith(prefixMessageAnnotationsKey)) {
- if (maMap == null) {
- maMap = new HashMap<>();
+ } else if (key.startsWith(JMS_AMQP_PREFIX)) {
+ // AMQP Message Information stored from a conversion to the Core Message
+ if (key.equals(JMS_AMQP_MESSAGE_FORMAT)) {
+ messageFormat = message.getLongProperty(JMS_AMQP_MESSAGE_FORMAT);
+ continue;
+ } else if (key.equals(JMS_AMQP_NATIVE)) {
+ // skip..internal use only
+ continue;
+ } else if (key.equals(JMS_AMQP_ORIGINAL_ENCODING)) {
+ // skip..internal use only
+ continue;
+ } else if (key.equals(JMS_AMQP_FIRST_ACQUIRER)) {
+ if (header == null) {
+ header = new Header();
+ }
+ header.setFirstAcquirer(message.getBooleanProperty(key));
+ continue;
+ } else if (key.equals(JMS_AMQP_HEADER)) {
+ if (header == null) {
+ header = new Header();
+ }
+ continue;
+ } else if (key.startsWith(JMS_AMQP_PROPERTIES)) {
+ if (properties == null) {
+ properties = new Properties();
+ }
+ continue;
+ } else if (key.startsWith(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX)) {
+ if (daMap == null) {
+ daMap = new HashMap<Symbol, Object>();
+ }
+ String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length());
+ daMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
+ continue;
+ } else if (key.startsWith(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX)) {
+ if (maMap == null) {
+ maMap = new HashMap<Symbol, Object>();
+ }
+ String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length());
+ maMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
+ continue;
+ } else if (key.equals(JMS_AMQP_CONTENT_TYPE)) {
+ if (properties == null) {
+ properties = new Properties();
+ }
+ properties.setContentType(Symbol.getSymbol(message.getStringProperty(key)));
+ continue;
+ } else if (key.equals(JMS_AMQP_CONTENT_ENCODING)) {
+ if (properties == null) {
+ properties = new Properties();
+ }
+ properties.setContentEncoding(Symbol.getSymbol(message.getStringProperty(key)));
+ continue;
+ } else if (key.equals(JMS_AMQP_REPLYTO_GROUP_ID)) {
+ if (properties == null) {
+ properties = new Properties();
+ }
+ properties.setReplyToGroupId(message.getStringProperty(key));
+ continue;
+ } else if (key.startsWith(JMS_AMQP_FOOTER_PREFIX)) {
+ if (footerMap == null) {
+ footerMap = new HashMap<Object, Object>();
+ }
+ String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length());
+ footerMap.put(name, message.getObjectProperty(key));
+ continue;
}
- String name = key.substring(prefixMessageAnnotationsKey.length());
- maMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
- } else if (key.equals(contentTypeKey)) {
- props.setContentType(Symbol.getSymbol(msg.getStringProperty(key)));
- } else if (key.equals(contentEncodingKey)) {
- props.setContentEncoding(Symbol.getSymbol(msg.getStringProperty(key)));
- } else if (key.equals(replyToGroupIDKey)) {
- props.setReplyToGroupId(msg.getStringProperty(key));
- } else if (key.startsWith(prefixFooterKey)) {
- if (footerMap == null) {
- footerMap = new HashMap();
+ } else if (key.equals("_AMQ_GROUP_ID")) {
+ String value = message.getStringProperty(key);
+ if (properties == null) {
+ properties = new Properties();
}
- String name = key.substring(prefixFooterKey.length());
- footerMap.put(name, msg.getObjectProperty(key));
+ properties.setGroupId(value);
+ continue;
+ } else if (key.equals(ServerJMSMessage.NATIVE_MESSAGE_ID)) {
+ // skip..internal use only
+ continue;
+ } else if (key.endsWith(HDR_SCHEDULED_DELIVERY_TIME.toString())) {
+ // skip..remove annotation from previous inbound transformation
+ continue;
} else if (key.equals(AMQPMessageTypes.AMQP_TYPE_KEY)) {
- // skip
- } else {
- if (apMap == null) {
- apMap = new HashMap();
- }
- Object objectProperty = msg.getObjectProperty(key);
- if (objectProperty instanceof byte[]) {
- Binary binary = new Binary((byte[]) objectProperty);
- apMap.put(key, binary);
- } else {
- apMap.put(key, objectProperty);
- }
+ // skip..internal use only - TODO - Remove this deprecated value in future release.
+ continue;
+ }
+
+ if (apMap == null) {
+ apMap = new HashMap<String, Object>();
}
+
+ Object objectProperty = message.getObjectProperty(key);
+ if (objectProperty instanceof byte[]) {
+ objectProperty = new Binary((byte[]) objectProperty);
+ }
+
+ apMap.put(key, objectProperty);
}
- MessageAnnotations ma = null;
- if (maMap != null) {
- ma = new MessageAnnotations(maMap);
+ EncoderImpl encoder = tlsCodec.get().encoder;
+ encoder.setByteBuffer(buffer);
+
+ if (header != null) {
+ encoder.writeObject(header);
}
- DeliveryAnnotations da = null;
if (daMap != null) {
- da = new DeliveryAnnotations(daMap);
+ encoder.writeObject(new DeliveryAnnotations(daMap));
+ }
+ if (maMap != null) {
+ encoder.writeObject(new MessageAnnotations(maMap));
+ }
+ if (properties != null) {
+ encoder.writeObject(properties);
}
- ApplicationProperties ap = null;
if (apMap != null) {
- ap = new ApplicationProperties(apMap);
+ encoder.writeObject(new ApplicationProperties(apMap));
+ }
+ if (body != null) {
+ encoder.writeObject(body);
}
- Footer footer = null;
if (footerMap != null) {
- footer = new Footer(footerMap);
+ encoder.writeObject(new Footer(footerMap));
+ }
+
+ return messageFormat;
+ }
+
+ private Section convertBody(ServerJMSMessage message) throws JMSException {
+
+ Section body = null;
+ short orignalEncoding = AMQP_UNKNOWN;
+
+ try {
+ orignalEncoding = message.getShortProperty(JMS_AMQP_ORIGINAL_ENCODING);
+ } catch (Exception ex) {
+ // Ignore and stick with UNKNOWN
+ }
+
+ if (message instanceof ServerJMSBytesMessage) {
+ Binary payload = getBinaryFromMessageBody((ServerJMSBytesMessage) message);
+
+ if (payload == null) {
+ payload = EMPTY_BINARY;
+ }
+
+ switch (orignalEncoding) {
+ case AMQP_NULL:
+ break;
+ case AMQP_VALUE_BINARY:
+ body = new AmqpValue(payload);
+ break;
+ case AMQP_DATA:
+ case AMQP_UNKNOWN:
+ default:
+ body = new Data(payload);
+ break;
+ }
+ } else if (message instanceof ServerJMSTextMessage) {
+ switch (orignalEncoding) {
+ case AMQP_NULL:
+ break;
+ case AMQP_DATA:
+ body = new Data(getBinaryFromMessageBody((ServerJMSTextMessage) message));
+ break;
+ case AMQP_VALUE_STRING:
+ case AMQP_UNKNOWN:
+ default:
+ body = new AmqpValue(((TextMessage) message).getText());
+ break;
+ }
+ } else if (message instanceof ServerJMSMapMessage) {
+ body = new AmqpValue(getMapFromMessageBody((ServerJMSMapMessage) message));
+ } else if (message instanceof ServerJMSStreamMessage) {
+ ArrayList<Object> list = new ArrayList<>();
+ final ServerJMSStreamMessage m = (ServerJMSStreamMessage) message;
+ try {
+ while (true) {
+ list.add(m.readObject());
+ }
+ } catch (MessageEOFException e) {
+ }
+
+ // Deprecated encoding markers - TODO - Remove on future release
+ if (orignalEncoding == AMQP_UNKNOWN) {
+ String amqpType = message.getStringProperty(AMQPMessageTypes.AMQP_TYPE_KEY);
+ if (amqpType != null) {
+ if (amqpType.equals(AMQPMessageTypes.AMQP_LIST)) {
+ orignalEncoding = AMQP_VALUE_LIST;
+ } else {
+ orignalEncoding = AMQP_SEQUENCE;
+ }
+ }
+ }
+
+ switch (orignalEncoding) {
+ case AMQP_SEQUENCE:
+ body = new AmqpSequence(list);
+ break;
+ case AMQP_VALUE_LIST:
+ case AMQP_UNKNOWN:
+ default:
+ body = new AmqpValue(list);
+ break;
+ }
+ } else if (message instanceof ServerJMSObjectMessage) {
+ Binary payload = getBinaryFromMessageBody((ServerJMSObjectMessage) message);
+
+ if (payload == null) {
+ payload = EMPTY_BINARY;
+ }
+
+ switch (orignalEncoding) {
+ case AMQP_VALUE_BINARY:
+ body = new AmqpValue(payload);
+ break;
+ case AMQP_DATA:
+ case AMQP_UNKNOWN:
+ default:
+ body = new Data(payload);
+ break;
+ }
+
+ // For a non-AMQP message we tag the outbound content type as containing
+ // a serialized Java object so that an AMQP client has a hint as to what
+ // we are sending it.
+ if (!message.propertyExists(JMS_AMQP_CONTENT_TYPE)) {
+ message.setStringProperty(JMS_AMQP_CONTENT_TYPE, SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+ }
+ } else if (message instanceof ServerJMSMessage) {
+ // If this is not an AMQP message that was converted then the original encoding
+ // will be unknown so we check for special cases of messages with special data
+ // encoded into the server message body.
+ if (orignalEncoding == AMQP_UNKNOWN) {
+ MessageInternal internalMessage = message.getInnerMessage();
+ int readerIndex = internalMessage.getBodyBuffer().readerIndex();
+ try {
+ Object s = internalMessage.getBodyBuffer().readNullableSimpleString();
+ if (s != null) {
+ body = new AmqpValue(s.toString());
+ }
+ } catch (Throwable ignored) {
+ logger.debug("Exception ignored during conversion, should be ok!", ignored.getMessage(), ignored);
+ } finally {
+ internalMessage.getBodyBuffer().readerIndex(readerIndex);
+ }
+ }
+ }
+
+ return body;
+ }
+
+ private Binary getBinaryFromMessageBody(ServerJMSBytesMessage message) throws JMSException {
+ byte[] data = new byte[(int) message.getBodyLength()];
+ message.readBytes(data);
+ message.reset(); // Need to reset after readBytes or future readBytes
+
+ return new Binary(data);
+ }
+
+ private Binary getBinaryFromMessageBody(ServerJMSTextMessage message) throws JMSException {
+ Binary result = null;
+ String text = message.getText();
+ if (text != null) {
+ result = new Binary(text.getBytes(StandardCharsets.UTF_8));
+ }
+
+ return result;
+ }
+
+ private Binary getBinaryFromMessageBody(ServerJMSObjectMessage message) throws JMSException {
+ message.getInnerMessage().getBodyBuffer().resetReaderIndex();
+ int size = message.getInnerMessage().getBodyBuffer().readInt();
+ byte[] bytes = new byte[size];
+ message.getInnerMessage().getBodyBuffer().readBytes(bytes);
+
+ return new Binary(bytes);
+ }
+
+ private Map<String, Object> getMapFromMessageBody(ServerJMSMapMessage message) throws JMSException {
+ final HashMap<String, Object> map = new LinkedHashMap<>();
+
+ @SuppressWarnings("unchecked")
+ final Enumeration<String> names = message.getMapNames();
+ while (names.hasMoreElements()) {
+ String key = names.nextElement();
+ Object value = message.getObject(key);
+ if (value instanceof byte[]) {
+ value = new Binary((byte[]) value);
+ }
+ map.put(key, value);
}
- return (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(header, da, ma, props, ap, body, footer);
+ return map;
}
private static byte destinationType(Destination destination) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSVendor.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSVendor.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSVendor.java
deleted file mode 100644
index 9a0ed63..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSVendor.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.converter.message;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
-
-public interface JMSVendor {
-
- BytesMessage createBytesMessage();
-
- StreamMessage createStreamMessage();
-
- Message createMessage();
-
- TextMessage createTextMessage();
-
- ObjectMessage createObjectMessage();
-
- MapMessage createMapMessage();
-
- void setJMSXUserID(Message message, String value);
-
- Destination createDestination(String name);
-
- void setJMSXGroupID(Message message, String groupId);
-
- void setJMSXGroupSequence(Message message, int value);
-
- void setJMSXDeliveryCount(Message message, long value);
-
- String toAddress(Destination destination);
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java
index 310d4ba..f15490f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java
@@ -1,13 +1,13 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,54 +16,36 @@
*/
package org.apache.activemq.artemis.protocol.amqp.converter.message;
-public abstract class OutboundTransformer {
-
- JMSVendor vendor;
- String prefixVendor;
+import java.io.UnsupportedEncodingException;
- String prefixDeliveryAnnotations = "DA_";
- String prefixMessageAnnotations = "MA_";
- String prefixFooter = "FT_";
+import javax.jms.JMSException;
- String messageFormatKey;
- String nativeKey;
- String firstAcquirerKey;
- String prefixDeliveryAnnotationsKey;
- String prefixMessageAnnotationsKey;
- String contentTypeKey;
- String contentEncodingKey;
- String replyToGroupIDKey;
- String prefixFooterKey;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
+import org.apache.activemq.artemis.utils.IDGenerator;
+import org.apache.qpid.proton.codec.WritableBuffer;
- public OutboundTransformer(JMSVendor vendor) {
- this.vendor = vendor;
- this.setPrefixVendor("JMS_AMQP_");
- }
-
- public String getPrefixVendor() {
- return prefixVendor;
- }
-
- public void setPrefixVendor(String prefixVendor) {
- this.prefixVendor = prefixVendor;
+public abstract class OutboundTransformer {
- messageFormatKey = prefixVendor + "MESSAGE_FORMAT";
- nativeKey = prefixVendor + "NATIVE";
- firstAcquirerKey = prefixVendor + "FirstAcquirer";
- prefixDeliveryAnnotationsKey = prefixVendor + prefixDeliveryAnnotations;
- prefixMessageAnnotationsKey = prefixVendor + prefixMessageAnnotations;
- contentTypeKey = prefixVendor + "ContentType";
- contentEncodingKey = prefixVendor + "ContentEncoding";
- replyToGroupIDKey = prefixVendor + "ReplyToGroupID";
- prefixFooterKey = prefixVendor + prefixFooter;
+ protected IDGenerator idGenerator;
+ public OutboundTransformer(IDGenerator idGenerator) {
+ this.idGenerator = idGenerator;
}
- public JMSVendor getVendor() {
- return vendor;
- }
+ /**
+ * Given an JMS Message perform a conversion to an AMQP Message and encode into a form that
+ * is ready for transmission.
+ *
+ * @param message
+ * the message to transform
+ * @param buffer
+ * the buffer where encoding should write to
+ *
+ * @return the message format key of the encoded message.
+ *
+ * @throws Exception
+ * if an error occurs during message transformation
+ */
+ public abstract long transform(ServerJMSMessage message, WritableBuffer buffer) throws JMSException, UnsupportedEncodingException;
- public void setVendor(JMSVendor vendor) {
- this.vendor = vendor;
- }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPInvalidContentTypeException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPInvalidContentTypeException.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPInvalidContentTypeException.java
new file mode 100644
index 0000000..4def92c
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPInvalidContentTypeException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.exceptions;
+
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+
+public class ActiveMQAMQPInvalidContentTypeException extends ActiveMQAMQPException {
+
+ public ActiveMQAMQPInvalidContentTypeException(String message) {
+ super(AmqpError.INTERNAL_ERROR, message, ActiveMQExceptionType.INTERNAL_ERROR);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 7d401fa..94e6a47 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -19,8 +19,6 @@ package org.apache.activemq.artemis.protocol.amqp.proton;
import java.util.Map;
import java.util.Objects;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -52,9 +50,11 @@ import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.message.ProtonJMessage;
import org.jboss.logging.Logger;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+
public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler {
private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class);
@@ -407,33 +407,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
return 0;
}
- //encode the message
- ProtonJMessage serverMessage;
- try {
- // This can be done a lot better here
- serverMessage = sessionSPI.encodeMessage(message, deliveryCount);
- } catch (Throwable e) {
- log.warn(e.getMessage(), e);
- throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
- }
-
- return performSend(serverMessage, message);
- }
-
- private static boolean hasCapabilities(Symbol symbol, Source source) {
- if (source != null) {
- if (source.getCapabilities() != null) {
- for (Symbol cap : source.getCapabilities()) {
- if (symbol.equals(cap)) {
- return true;
- }
- }
- }
- }
- return false;
- }
-
- protected int performSend(ProtonJMessage serverMessage, Object context) {
if (!creditsSemaphore.tryAcquire()) {
try {
creditsSemaphore.acquire();
@@ -444,22 +417,32 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
}
- //presettle means we can ack the message on the dealer side before we send it, i.e. for browsers
+ // presettle means we can settle the message on the dealer side before we send it, i.e. for browsers
boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
- //we only need a tag if we are going to ack later
+ // we only need a tag if we are going to settle later
byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
try {
- serverMessage.encode(new NettyWritable(nettyBuffer));
+ long messageFormat = 0;
+
+ // Encode the Server Message into the given Netty Buffer as an AMQP
+ // Message transformed from the internal message model.
+ try {
+ messageFormat = sessionSPI.encodeMessage(message, deliveryCount, new NettyWritable(nettyBuffer));
+ } catch (Throwable e) {
+ log.warn(e.getMessage(), e);
+ throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
+ }
int size = nettyBuffer.writerIndex();
synchronized (connection.getLock()) {
final Delivery delivery;
delivery = sender.delivery(tag, 0, tag.length);
- delivery.setContext(context);
+ delivery.setMessageFormat((int) messageFormat);
+ delivery.setContext(message);
// this will avoid a copy.. patch provided by Norman using buffer.array()
sender.send(nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes());
@@ -479,6 +462,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
}
+ private static boolean hasCapabilities(Symbol symbol, Source source) {
+ if (source != null) {
+ if (source.getCapabilities() != null) {
+ for (Symbol cap : source.getCapabilities()) {
+ if (symbol.equals(cap)) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
private static String createQueueName(String clientId, String pubId) {
return clientId + "." + pubId;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
index 148482e..96ce90e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
@@ -16,32 +16,31 @@
*/
package org.apache.activemq.artemis.protocol.amqp.converter;
-import java.io.ByteArrayOutputStream;
+import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.wrapMessage;
+
import java.io.IOException;
-import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
-import org.apache.blacklist.ABadClass;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
@@ -53,10 +52,13 @@ import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Assert;
import org.junit.Test;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+
public class TestConversions extends Assert {
@Test
- public void testObjectMessageWhiteList() throws Exception {
+ public void testAmqpValueOfBooleanIsPassedThrough() throws Exception {
Map<String, Object> mapprop = createPropertiesMap();
ApplicationProperties properties = new ApplicationProperties(mapprop);
MessageImpl message = (MessageImpl) Message.Factory.create();
@@ -73,39 +75,15 @@ public class TestConversions extends Assert {
EncodedMessage encodedMessage = encodeMessage(message);
ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
- ServerJMSObjectMessage serverMessage = (ServerJMSObjectMessage) converter.inboundJMSType(encodedMessage);
+ ServerMessage serverMessage = converter.inbound(encodedMessage);
- verifyProperties(serverMessage);
+ verifyProperties(new ServerJMSMessage(serverMessage, 0));
- assertEquals(true, serverMessage.getObject());
+ EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0);
+ Message amqpMessage = encoded.decode();
- Object obj = converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0);
-
- AmqpValue value = (AmqpValue) ((Message) obj).getBody();
+ AmqpValue value = (AmqpValue) amqpMessage.getBody();
assertEquals(value.getValue(), true);
-
- }
-
- @Test
- public void testObjectMessageNotOnWhiteList() throws Exception {
-
- ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
- ServerMessageImpl message = new ServerMessageImpl(1, 1024);
- message.setType((byte) 2);
- ServerJMSObjectMessage serverMessage = new ServerJMSObjectMessage(message, 1024);
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ObjectOutputStream ois = new ObjectOutputStream(out);
- ois.writeObject(new ABadClass());
- byte[] src = out.toByteArray();
- serverMessage.getInnerMessage().getBodyBuffer().writeInt(src.length);
- serverMessage.getInnerMessage().getBodyBuffer().writeBytes(src);
-
- try {
- converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0);
- fail("should throw ClassNotFoundException");
- } catch (ClassNotFoundException e) {
- //ignore
- }
}
@Test
@@ -126,22 +104,23 @@ public class TestConversions extends Assert {
EncodedMessage encodedMessage = encodeMessage(message);
ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
- ServerJMSBytesMessage serverMessage = (ServerJMSBytesMessage) converter.inboundJMSType(encodedMessage);
+ ServerMessage serverMessage = converter.inbound(encodedMessage);
- verifyProperties(serverMessage);
+ ServerJMSBytesMessage bytesMessage = (ServerJMSBytesMessage) wrapMessage(BYTES_TYPE, serverMessage, 0);
- assertEquals(bodyBytes.length, serverMessage.getBodyLength());
+ verifyProperties(bytesMessage);
+
+ assertEquals(bodyBytes.length, bytesMessage.getBodyLength());
byte[] newBodyBytes = new byte[4];
- serverMessage.readBytes(newBodyBytes);
+ bytesMessage.readBytes(newBodyBytes);
Assert.assertArrayEquals(bodyBytes, newBodyBytes);
- Object obj = converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0);
+ Object obj = converter.outbound(serverMessage, 0);
System.out.println("output = " + obj);
-
}
private void verifyProperties(javax.jms.Message message) throws Exception {
@@ -175,25 +154,25 @@ public class TestConversions extends Assert {
EncodedMessage encodedMessage = encodeMessage(message);
ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
- ServerJMSMapMessage serverMessage = (ServerJMSMapMessage) converter.inboundJMSType(encodedMessage);
+ ServerMessage serverMessage = converter.inbound(encodedMessage);
- verifyProperties(serverMessage);
+ ServerJMSMapMessage mapMessage = (ServerJMSMapMessage) wrapMessage(MAP_TYPE, serverMessage, 0);
+ mapMessage.decode();
- Assert.assertEquals(1, serverMessage.getInt("someint"));
- Assert.assertEquals("value", serverMessage.getString("somestr"));
+ verifyProperties(mapMessage);
- Object obj = converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0);
+ Assert.assertEquals(1, mapMessage.getInt("someint"));
+ Assert.assertEquals("value", mapMessage.getString("somestr"));
- reEncodeMsg(obj);
+ EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0);
+ Message amqpMessage = encoded.decode();
- MessageImpl outMessage = (MessageImpl) obj;
- AmqpValue value = (AmqpValue) outMessage.getBody();
- Map mapoutput = (Map) value.getValue();
+ AmqpValue value = (AmqpValue) amqpMessage.getBody();
+ Map<?, ?> mapoutput = (Map<?, ?>) value.getValue();
assertEquals(Integer.valueOf(1), mapoutput.get("someint"));
- System.out.println("output = " + obj);
-
+ System.out.println("output = " + amqpMessage);
}
@Test
@@ -212,26 +191,25 @@ public class TestConversions extends Assert {
EncodedMessage encodedMessage = encodeMessage(message);
ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
- ServerJMSStreamMessage serverMessage = (ServerJMSStreamMessage) converter.inboundJMSType(encodedMessage);
+ ServerMessage serverMessage = converter.inbound(encodedMessage);
simulatePersistence(serverMessage);
- verifyProperties(serverMessage);
+ ServerJMSStreamMessage streamMessage = (ServerJMSStreamMessage) wrapMessage(STREAM_TYPE, serverMessage, 0);
- serverMessage.reset();
+ verifyProperties(streamMessage);
- assertEquals(10, serverMessage.readInt());
- assertEquals("10", serverMessage.readString());
+ streamMessage.reset();
- Object obj = converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0);
+ assertEquals(10, streamMessage.readInt());
+ assertEquals("10", streamMessage.readString());
- reEncodeMsg(obj);
+ EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0);
+ Message amqpMessage = encoded.decode();
- MessageImpl outMessage = (MessageImpl) obj;
- List list = ((AmqpSequence) outMessage.getBody()).getValue();
+ List<?> list = ((AmqpSequence) amqpMessage.getBody()).getValue();
Assert.assertEquals(Integer.valueOf(10), list.get(0));
Assert.assertEquals("10", list.get(1));
-
}
@Test
@@ -247,33 +225,33 @@ public class TestConversions extends Assert {
EncodedMessage encodedMessage = encodeMessage(message);
ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
- ServerJMSTextMessage serverMessage = (ServerJMSTextMessage) converter.inboundJMSType(encodedMessage);
+ ServerMessage serverMessage = converter.inbound(encodedMessage);
simulatePersistence(serverMessage);
- verifyProperties(serverMessage);
+ ServerJMSTextMessage textMessage = (ServerJMSTextMessage) wrapMessage(TEXT_TYPE, serverMessage, 0);
+ textMessage.decode();
- Assert.assertEquals(text, serverMessage.getText());
+ verifyProperties(textMessage);
- Object obj = converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0);
+ Assert.assertEquals(text, textMessage.getText());
- reEncodeMsg(obj);
+ EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0);
+ Message amqpMessage = encoded.decode();
- MessageImpl outMessage = (MessageImpl) obj;
- AmqpValue value = (AmqpValue) outMessage.getBody();
+ AmqpValue value = (AmqpValue) amqpMessage.getBody();
String textValue = (String) value.getValue();
Assert.assertEquals(text, textValue);
- System.out.println("output = " + obj);
-
+ System.out.println("output = " + amqpMessage);
}
- private void simulatePersistence(ServerJMSMessage serverMessage) {
- serverMessage.getInnerMessage().setAddress(new SimpleString("jms.queue.SomeAddress"));
+ private void simulatePersistence(ServerMessage serverMessage) {
+ serverMessage.setAddress(new SimpleString("jms.queue.SomeAddress"));
// This is just to simulate what would happen during the persistence of the message
// We need to still be able to recover the message when we read it back
- ((EncodingSupport) serverMessage.getInnerMessage()).encode(new EmptyBuffer());
+ ((EncodingSupport) serverMessage).encode(new EmptyBuffer());
}
private ProtonJMessage reEncodeMsg(Object obj) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java
new file mode 100644
index 0000000..4caead7
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidContentTypeException;
+import org.junit.Test;
+
+public class AMQPContentTypeSupportTest {
+
+ @Test(expected = ActiveMQAMQPInvalidContentTypeException.class)
+ public void testParseContentTypeWithOnlyType() throws Exception {
+ doParseContentTypeTestImpl("type", null);
+ }
+
+ @Test(expected = ActiveMQAMQPInvalidContentTypeException.class)
+ public void testParseContentTypeEndsWithSlash() throws Exception {
+ doParseContentTypeTestImpl("type/", null);
+ }
+
+ @Test(expected = ActiveMQAMQPInvalidContentTypeException.class)
+ public void testParseContentTypeMissingSubtype() throws Exception {
+ doParseContentTypeTestImpl("type/;", null);
+ }
+
+ @Test(expected = ActiveMQAMQPInvalidContentTypeException.class)
+ public void testParseContentTypeEmptyString() throws Exception {
+ doParseContentTypeTestImpl("", null);
+ }
+
+ @Test(expected = ActiveMQAMQPInvalidContentTypeException.class)
+ public void testParseContentTypeNullString() throws Exception {
+ doParseContentTypeTestImpl(null, null);
+ }
+
+ @Test
+ public void testParseContentTypeNoParamsAfterSeparatorNonTextual() throws Exception {
+ // Expect null as this is not a textual type
+ doParseContentTypeTestImpl("type/subtype;", null);
+ }
+
+ @Test
+ public void testParseContentTypeNoParamsAfterSeparatorTextualType() throws Exception {
+ doParseContentTypeTestImpl("text/plain;", StandardCharsets.UTF_8);
+ }
+
+ @Test
+ public void testParseContentTypeEmptyParamsAfterSeparator() throws Exception {
+ doParseContentTypeTestImpl("text/plain;;", StandardCharsets.UTF_8);
+ }
+
+ @Test
+ public void testParseContentTypeNoParams() throws Exception {
+ doParseContentTypeTestImpl("text/plain", StandardCharsets.UTF_8);
+ }
+
+ @Test
+ public void testParseContentTypeWithCharsetUtf8() throws Exception {
+ doParseContentTypeTestImpl("text/plain;charset=utf-8", StandardCharsets.UTF_8);
+ }
+
+ @Test
+ public void testParseContentTypeWithCharsetAscii() throws Exception {
+ doParseContentTypeTestImpl("text/plain;charset=us-ascii", StandardCharsets.US_ASCII);
+ }
+
+ @Test
+ public void testParseContentTypeWithMultipleParams() throws Exception {
+ doParseContentTypeTestImpl("text/plain; param=value; charset=us-ascii", StandardCharsets.US_ASCII);
+ }
+
+ @Test
+ public void testParseContentTypeWithCharsetQuoted() throws Exception {
+ doParseContentTypeTestImpl("text/plain;charset=\"us-ascii\"", StandardCharsets.US_ASCII);
+ }
+
+ @Test(expected = ActiveMQAMQPInvalidContentTypeException.class)
+ public void testParseContentTypeWithCharsetQuotedEmpty() throws Exception {
+ doParseContentTypeTestImpl("text/plain;charset=\"\"", null);
+ }
+
+ @Test(expected = ActiveMQAMQPInvalidContentTypeException.class)
+ public void testParseContentTypeWithCharsetQuoteNotClosed() throws Exception {
+ doParseContentTypeTestImpl("text/plain;charset=\"unclosed", null);
+ }
+
+ @Test(expected = ActiveMQAMQPInvalidContentTypeException.class)
+ public void testParseContentTypeWithCharsetQuoteNotClosedEmpty() throws Exception {
+ doParseContentTypeTestImpl("text/plain;charset=\"", null);
+ }
+
+ @Test(expected = ActiveMQAMQPInvalidContentTypeException.class)
+ public void testParseContentTypeWithNoCharsetValue() throws Exception {
+ doParseContentTypeTestImpl("text/plain;charset=", null);
+ }
+
+ @Test
+ public void testParseContentTypeWithTextPlain() throws Exception {
+ doParseContentTypeTestImpl("text/plain;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+ doParseContentTypeTestImpl("text/plain;charset=us-ascii", StandardCharsets.US_ASCII);
+ doParseContentTypeTestImpl("text/plain;charset=utf-8", StandardCharsets.UTF_8);
+ doParseContentTypeTestImpl("text/plain", StandardCharsets.UTF_8);
+ }
+
+ @Test
+ public void testParseContentTypeWithTextJson() throws Exception {
+ doParseContentTypeTestImpl("text/json;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+ doParseContentTypeTestImpl("text/json;charset=us-ascii", StandardCharsets.US_ASCII);
+ doParseContentTypeTestImpl("text/json;charset=utf-8", StandardCharsets.UTF_8);
+ doParseContentTypeTestImpl("text/json", StandardCharsets.UTF_8);
+ }
+
+ @Test
+ public void testParseContentTypeWithTextHtml() throws Exception {
+ doParseContentTypeTestImpl("text/html;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+ doParseContentTypeTestImpl("text/html;charset=us-ascii", StandardCharsets.US_ASCII);
+ doParseContentTypeTestImpl("text/html;charset=utf-8", StandardCharsets.UTF_8);
+ doParseContentTypeTestImpl("text/html", StandardCharsets.UTF_8);
+ }
+
+ @Test
+ public void testParseContentTypeWithTextFoo() throws Exception {
+ doParseContentTypeTestImpl("text/foo;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+ doParseContentTypeTestImpl("text/foo;charset=us-ascii", StandardCharsets.US_ASCII);
+ doParseContentTypeTestImpl("text/foo;charset=utf-8", StandardCharsets.UTF_8);
+ doParseContentTypeTestImpl("text/foo", StandardCharsets.UTF_8);
+ }
+
+ @Test
+ public void testParseContentTypeWithApplicationJson() throws Exception {
+ doParseContentTypeTestImpl("application/json;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+ doParseContentTypeTestImpl("application/json;charset=us-ascii", StandardCharsets.US_ASCII);
+ doParseContentTypeTestImpl("application/json;charset=utf-8", StandardCharsets.UTF_8);
+ doParseContentTypeTestImpl("application/json", StandardCharsets.UTF_8);
+ }
+
+ @Test
+ public void testParseContentTypeWithApplicationJsonVariant() throws Exception {
+ doParseContentTypeTestImpl("application/something+json;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+ doParseContentTypeTestImpl("application/something+json;charset=us-ascii", StandardCharsets.US_ASCII);
+ doParseContentTypeTestImpl("application/something+json;charset=utf-8", StandardCharsets.UTF_8);
+ doParseContentTypeTestImpl("application/something+json", StandardCharsets.UTF_8);
+ }
+
+ @Test
+ public void testParseContentTypeWithApplicationJavascript() throws Exception {
+ doParseContentTypeTestImpl("application/javascript;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+ doParseContentTypeTestImpl("application/javascript;charset=us-ascii", StandardCharsets.US_ASCII);
+ doParseContentTypeTestImpl("application/javascript;charset=utf-8", StandardCharsets.UTF_8);
+ doParseContentTypeTestImpl("application/javascript", StandardCharsets.UTF_8);
+ }
+
+ @Test
+ public void testParseContentTypeWithApplicationEcmascript() throws Exception {
+ doParseContentTypeTestImpl("application/ecmascript;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+ doParseContentTypeTestImpl("application/ecmascript;charset=us-ascii", StandardCharsets.US_ASCII);
+ doParseContentTypeTestImpl("application/ecmascript;charset=utf-8", StandardCharsets.UTF_8);
+ doParseContentTypeTestImpl("application/ecmascript", StandardCharsets.UTF_8);
+ }
+
+ @Test
+ public void testParseContentTypeWithApplicationXml() throws Exception {
+ doParseContentTypeTestImpl("application/xml;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+ doParseContentTypeTestImpl("application/xml;charset=us-ascii", StandardCharsets.US_ASCII);
+ doParseContentTypeTestImpl("application/xml;charset=utf-8", StandardCharsets.UTF_8);
+ doParseContentTypeTestImpl("application/xml", StandardCharsets.UTF_8);
+ }
+
+ @Test
+ public void testParseContentTypeWithApplicationXmlVariant() throws Exception {
+ doParseContentTypeTestImpl("application/something+xml;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+ doParseContentTypeTestImpl("application/something+xml;charset=us-ascii", StandardCharsets.US_ASCII);
+ doParseContentTypeTestImpl("application/something+xml;charset=utf-8", StandardCharsets.UTF_8);
+ doParseContentTypeTestImpl("application/something+xml", StandardCharsets.UTF_8);
+ }
+
+ @Test
+ public void testParseContentTypeWithApplicationXmlDtd() throws Exception {
+ doParseContentTypeTestImpl("application/xml-dtd;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
+ doParseContentTypeTestImpl("application/xml-dtd;charset=us-ascii", StandardCharsets.US_ASCII);
+ doParseContentTypeTestImpl("application/xml-dtd;charset=utf-8", StandardCharsets.UTF_8);
+ doParseContentTypeTestImpl("application/xml-dtd", StandardCharsets.UTF_8);
+ }
+
+ @Test
+ public void testParseContentTypeWithApplicationOtherNotTextual() throws Exception {
+ // Expect null as this is not a textual type
+ doParseContentTypeTestImpl("application/other", null);
+ }
+
+ @Test
+ public void testParseContentTypeWithApplicationOctetStream() throws Exception {
+ // Expect null as this is not a textual type
+ doParseContentTypeTestImpl(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE, null);
+ }
+
+ @Test
+ public void testParseContentTypeWithApplicationJavaSerialized() throws Exception {
+ // Expect null as this is not a textual type
+ doParseContentTypeTestImpl(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, null);
+ }
+
+ private void doParseContentTypeTestImpl(String contentType, Charset expected) throws ActiveMQAMQPInvalidContentTypeException {
+ Charset charset = AMQPContentTypeSupport.parseContentTypeForTextualCharset(contentType);
+ if (expected == null) {
+ assertNull("Expected no charset, but got:" + charset, charset);
+ } else {
+ assertEquals("Charset not as expected", expected, charset);
+ }
+ }
+}