You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/09/26 22:45:16 UTC
[2/3] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-6438
http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
index 59c306f..985f4f5 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
@@ -18,40 +18,63 @@ package org.apache.activemq.transport.amqp.message;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_DATA;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_NULL;
-import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_ORIGINAL_ENCODING_KEY;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_SEQUENCE;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_UNKNOWN;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_BINARY;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_LIST;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_STRING;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.CONTENT_ENCODING;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.CONTENT_TYPE;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.DELIVERY_ANNOTATION_PREFIX;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.EMPTY_BINARY;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.FIRST_ACQUIRER;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.FOOTER_PREFIX;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.HEADER;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_CONTENT_TYPE;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_DELIVERY_ANNOTATION_PREFIX;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_FOOTER_PREFIX;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_PREFIX;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_PREFIX_LENGTH;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.MESSAGE_ANNOTATION_PREFIX;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.MESSAGE_FORMAT;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.NATIVE;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.ORIGINAL_ENCODING;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.PROPERTIES;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.REPLYTO_GROUP_ID;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.getBinaryFromMessageBody;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.getMapFromMessageBody;
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Date;
-import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
-import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageEOFException;
-import javax.jms.MessageFormatException;
-import javax.jms.ObjectMessage;
import javax.jms.Queue;
-import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.MessageId;
import org.apache.activemq.transport.amqp.AmqpProtocolException;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.util.TypeConversionSupport;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte;
@@ -66,12 +89,12 @@ import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.codec.CompositeWritableBuffer;
-import org.apache.qpid.proton.codec.DroppingWritableBuffer;
-import org.apache.qpid.proton.codec.WritableBuffer;
-import org.apache.qpid.proton.message.ProtonJMessage;
+import org.apache.qpid.proton.codec.AMQPDefinedTypes;
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.fusesource.hawtbuf.UTF8Buffer;
-public class JMSMappingOutboundTransformer extends OutboundTransformer {
+public class JMSMappingOutboundTransformer implements OutboundTransformer {
public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-dest");
public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-reply-to");
@@ -81,225 +104,276 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
public static final byte TEMP_QUEUE_TYPE = 0x02;
public static final byte TEMP_TOPIC_TYPE = 0x03;
- // Deprecated legacy values used by old QPid AMQP 1.0 JMS client.
-
- public static final Symbol LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-to-type");
- public static final Symbol LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-reply-type");
-
- public static final String LEGACY_QUEUE_TYPE = "queue";
- public static final String LEGACY_TOPIC_TYPE = "topic";
- public static final String LEGACY_TEMP_QUEUE_TYPE = "temporary,queue";
- public static final String LEGACY_TEMP_TOPIC_TYPE = "temporary,topic";
-
- public JMSMappingOutboundTransformer(ActiveMQJMSVendor vendor) {
- super(vendor);
+ // For now Proton requires that we create a decoder to create an encoder
+ private final DecoderImpl decoder = new DecoderImpl();
+ private final EncoderImpl encoder = new EncoderImpl(decoder);
+ {
+ AMQPDefinedTypes.registerAllTypes(decoder, encoder);
}
@Override
- public EncodedMessage transform(Message msg) throws Exception {
- if (msg == null) {
- return null;
- }
-
- try {
- if (msg.getBooleanProperty(prefixVendor + "NATIVE")) {
- return null;
- }
- } catch (MessageFormatException e) {
+ public EncodedMessage transform(ActiveMQMessage message) throws Exception {
+ if (message == null) {
return null;
}
- ProtonJMessage amqp = convert(msg);
-
- long messageFormat;
- try {
- messageFormat = msg.getLongProperty(this.messageFormatKey);
- } catch (MessageFormatException e) {
- return null;
- }
-
- ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]);
- final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
- int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
- if (overflow.position() > 0) {
- buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]);
- c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
- }
-
- return new EncodedMessage(messageFormat, buffer.array(), 0, c);
- }
-
- /**
- * Perform the conversion between JMS Message and Proton Message without
- * re-encoding it to array. This is needed because some frameworks may elect
- * to do this on their own way.
- *
- * @param message
- * The message to transform into an AMQP version for dispatch.
- *
- * @return an AMQP Message that represents the given JMS Message.
- *
- * @throws Exception if an error occurs during the conversion.
- */
- public ProtonJMessage convert(Message message) throws JMSException, UnsupportedEncodingException {
- Header header = new Header();
- Properties props = new Properties();
+ long messageFormat = 0;
+ Header header = null;
+ Properties properties = null;
Map<Symbol, Object> daMap = null;
Map<Symbol, Object> maMap = null;
Map<String,Object> apMap = null;
Map<Object, Object> footerMap = null;
- Section body = null;
- body = convertBody(message);
+ Section body = convertBody(message);
- header.setDurable(message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
- header.setPriority(new UnsignedByte((byte) message.getJMSPriority()));
- if (message.getJMSType() != null) {
- props.setSubject(message.getJMSType());
+ if (message.isPersistent()) {
+ if (header == null) {
+ header = new Header();
+ }
+ header.setDurable(true);
}
- if (message.getJMSMessageID() != null) {
- props.setMessageId(vendor.getOriginalMessageId(message));
+ byte priority = message.getPriority();
+ if (priority != Message.DEFAULT_PRIORITY) {
+ if (header == null) {
+ header = new Header();
+ }
+ header.setPriority(new UnsignedByte(priority));
+ }
+ String type = message.getType();
+ if (type != null) {
+ if (properties == null) {
+ properties = new Properties();
+ }
+ properties.setSubject(type);
+ }
+ MessageId messageId = message.getMessageId();
+ if (messageId != null) {
+ if (properties == null) {
+ properties = new Properties();
+ }
+ properties.setMessageId(getOriginalMessageId(message));
}
- if (message.getJMSDestination() != null) {
- props.setTo(vendor.toAddress(message.getJMSDestination()));
+ ActiveMQDestination destination = message.getDestination();
+ if (destination != null) {
+ if (properties == null) {
+ properties = new Properties();
+ }
+ properties.setTo(destination.getQualifiedName());
if (maMap == null) {
maMap = new HashMap<Symbol, Object>();
}
- maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(message.getJMSDestination()));
-
- // Deprecated: used by legacy QPid AMQP 1.0 JMS client
- maMap.put(LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION, destinationAttributes(message.getJMSDestination()));
+ maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination));
}
- if (message.getJMSReplyTo() != null) {
- props.setReplyTo(vendor.toAddress(message.getJMSReplyTo()));
+ ActiveMQDestination replyTo = message.getReplyTo();
+ if (replyTo != null) {
+ if (properties == null) {
+ properties = new Properties();
+ }
+ properties.setReplyTo(replyTo.getQualifiedName());
if (maMap == null) {
maMap = new HashMap<Symbol, Object>();
}
- maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(message.getJMSReplyTo()));
-
- // Deprecated: used by legacy QPid AMQP 1.0 JMS client
- maMap.put(LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationAttributes(message.getJMSReplyTo()));
+ maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo));
}
- if (message.getJMSCorrelationID() != null) {
- String correlationId = message.getJMSCorrelationID();
+ String correlationId = message.getCorrelationId();
+ if (correlationId != null) {
+ if (properties == null) {
+ properties = new Properties();
+ }
try {
- props.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId));
+ properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId));
} catch (AmqpProtocolException e) {
- props.setCorrelationId(correlationId);
+ properties.setCorrelationId(correlationId);
}
}
- if (message.getJMSExpiration() != 0) {
- long ttl = message.getJMSExpiration() - System.currentTimeMillis();
+ long expiration = message.getExpiration();
+ if (expiration != 0) {
+ long ttl = expiration - System.currentTimeMillis();
if (ttl < 0) {
ttl = 1;
}
+
+ if (header == null) {
+ header = new Header();
+ }
header.setTtl(new UnsignedInteger((int) ttl));
- props.setAbsoluteExpiryTime(new Date(message.getJMSExpiration()));
+ if (properties == null) {
+ properties = new Properties();
+ }
+ properties.setAbsoluteExpiryTime(new Date(expiration));
}
- if (message.getJMSTimestamp() != 0) {
- props.setCreationTime(new Date(message.getJMSTimestamp()));
+ long timeStamp = message.getTimestamp();
+ if (timeStamp != 0) {
+ if (properties == null) {
+ properties = new Properties();
+ }
+ properties.setCreationTime(new Date(timeStamp));
}
- @SuppressWarnings("unchecked")
- final Enumeration<String> keys = message.getPropertyNames();
-
- while (keys.hasMoreElements()) {
- String key = keys.nextElement();
- if (key.equals(messageFormatKey) || key.equals(nativeKey) || key.equals(AMQP_ORIGINAL_ENCODING_KEY)) {
- // skip transformer appended properties
- } else if (key.equals(firstAcquirerKey)) {
- header.setFirstAcquirer(message.getBooleanProperty(key));
- } else if (key.startsWith("JMSXDeliveryCount")) {
- // The AMQP delivery-count field only includes prior failed delivery attempts,
- // whereas JMSXDeliveryCount includes the first/current delivery attempt.
- int amqpDeliveryCount = message.getIntProperty(key) - 1;
- if (amqpDeliveryCount > 0) {
- header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
- }
- } else if (key.startsWith("JMSXUserID")) {
- String value = message.getStringProperty(key);
- props.setUserId(new Binary(value.getBytes("UTF-8")));
- } else if (key.startsWith("JMSXGroupID")) {
- String value = message.getStringProperty(key);
- props.setGroupId(value);
- if (apMap == null) {
- apMap = new HashMap<String, Object>();
- }
- apMap.put(key, value);
- } else if (key.startsWith("JMSXGroupSeq")) {
- UnsignedInteger value = new UnsignedInteger(message.getIntProperty(key));
- props.setGroupSequence(value);
- if (apMap == null) {
- apMap = new HashMap<String, Object>();
- }
- apMap.put(key, value);
- } else if (key.startsWith(prefixDeliveryAnnotationsKey)) {
- if (daMap == null) {
- daMap = new HashMap<Symbol, Object>();
- }
- String name = key.substring(prefixDeliveryAnnotationsKey.length());
- daMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
- } else if (key.startsWith(prefixMessageAnnotationsKey)) {
- if (maMap == null) {
- maMap = new HashMap<Symbol, Object>();
- }
- String name = key.substring(prefixMessageAnnotationsKey.length());
- maMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
- } else if (key.equals(contentTypeKey)) {
- props.setContentType(Symbol.getSymbol(message.getStringProperty(key)));
- } else if (key.equals(contentEncodingKey)) {
- props.setContentEncoding(Symbol.getSymbol(message.getStringProperty(key)));
- } else if (key.equals(replyToGroupIDKey)) {
- props.setReplyToGroupId(message.getStringProperty(key));
- } else if (key.startsWith(prefixFooterKey)) {
- if (footerMap == null) {
- footerMap = new HashMap<Object, Object>();
- }
- String name = key.substring(prefixFooterKey.length());
- footerMap.put(name, message.getObjectProperty(key));
- } else {
- if (apMap == null) {
- apMap = new HashMap<String, Object>();
+ // JMSX Message Properties
+ int deliveryCount = message.getRedeliveryCounter();
+ if (deliveryCount > 0) {
+ if (header == null) {
+ header = new Header();
+ }
+ header.setDeliveryCount(new UnsignedInteger(deliveryCount));
+ }
+ String userId = message.getUserID();
+ if (userId != null) {
+ if (properties == null) {
+ properties = new Properties();
+ }
+ properties.setUserId(new Binary(userId.getBytes(StandardCharsets.UTF_8)));
+ }
+ String groupId = message.getGroupID();
+ if (groupId != null) {
+ if (properties == null) {
+ properties = new Properties();
+ }
+ properties.setGroupId(groupId);
+ }
+ int groupSequence = message.getGroupSequence();
+ if (groupSequence > 0) {
+ UnsignedInteger value = new UnsignedInteger(groupSequence);
+ if (properties == null) {
+ properties = new Properties();
+ }
+ properties.setGroupSequence(value);
+ }
+
+ final Map<String, Object> entries;
+ try {
+ entries = message.getProperties();
+ } catch (IOException e) {
+ throw JMSExceptionSupport.create(e);
+ }
+
+ for (Map.Entry<String, Object> entry : entries.entrySet()) {
+ String key = entry.getKey();
+ Object value = entry.getValue();
+ if (value instanceof UTF8Buffer) {
+ value = value.toString();
+ }
+
+ if (key.startsWith(JMS_AMQP_PREFIX)) {
+ if (key.startsWith(NATIVE, JMS_AMQP_PREFIX_LENGTH)) {
+ // skip transformer appended properties
+ continue;
+ } else if (key.startsWith(ORIGINAL_ENCODING, JMS_AMQP_PREFIX_LENGTH)) {
+ // skip transformer appended properties
+ continue;
+ } else if (key.startsWith(MESSAGE_FORMAT, JMS_AMQP_PREFIX_LENGTH)) {
+ messageFormat = (long) TypeConversionSupport.convert(entry.getValue(), Long.class);
+ continue;
+ } else if (key.startsWith(HEADER, JMS_AMQP_PREFIX_LENGTH)) {
+ if (header == null) {
+ header = new Header();
+ }
+ continue;
+ } else if (key.startsWith(PROPERTIES, JMS_AMQP_PREFIX_LENGTH)) {
+ if (properties == null) {
+ properties = new Properties();
+ }
+ continue;
+ } else if (key.startsWith(MESSAGE_ANNOTATION_PREFIX, JMS_AMQP_PREFIX_LENGTH)) {
+ if (maMap == null) {
+ maMap = new HashMap<Symbol, Object>();
+ }
+ String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length());
+ maMap.put(Symbol.valueOf(name), value);
+ continue;
+ } else if (key.startsWith(FIRST_ACQUIRER, JMS_AMQP_PREFIX_LENGTH)) {
+ if (header == null) {
+ header = new Header();
+ }
+ header.setFirstAcquirer((boolean) TypeConversionSupport.convert(value, Boolean.class));
+ continue;
+ } else if (key.startsWith(CONTENT_TYPE, JMS_AMQP_PREFIX_LENGTH)) {
+ if (properties == null) {
+ properties = new Properties();
+ }
+ properties.setContentType(Symbol.getSymbol((String) TypeConversionSupport.convert(value, String.class)));
+ continue;
+ } else if (key.startsWith(CONTENT_ENCODING, JMS_AMQP_PREFIX_LENGTH)) {
+ if (properties == null) {
+ properties = new Properties();
+ }
+ properties.setContentEncoding(Symbol.getSymbol((String) TypeConversionSupport.convert(value, String.class)));
+ continue;
+ } else if (key.startsWith(REPLYTO_GROUP_ID, JMS_AMQP_PREFIX_LENGTH)) {
+ if (properties == null) {
+ properties = new Properties();
+ }
+ properties.setReplyToGroupId((String) TypeConversionSupport.convert(value, String.class));
+ continue;
+ } else if (key.startsWith(DELIVERY_ANNOTATION_PREFIX, JMS_AMQP_PREFIX_LENGTH)) {
+ if (daMap == null) {
+ daMap = new HashMap<Symbol, Object>();
+ }
+ String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length());
+ daMap.put(Symbol.valueOf(name), value);
+ continue;
+ } else if (key.startsWith(FOOTER_PREFIX, JMS_AMQP_PREFIX_LENGTH)) {
+ if (footerMap == null) {
+ footerMap = new HashMap<Object, Object>();
+ }
+ String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length());
+ footerMap.put(name, value);
+ continue;
}
- apMap.put(key, message.getObjectProperty(key));
}
+
+ // The property didn't map into any other slot so we store it in the
+ // Application Properties section of the message.
+ if (apMap == null) {
+ apMap = new HashMap<String, Object>();
+ }
+ apMap.put(key, value);
}
- MessageAnnotations ma = null;
- if (maMap != null) {
- ma = new MessageAnnotations(maMap);
+ final AmqpWritableBuffer buffer = new AmqpWritableBuffer();
+ encoder.setByteBuffer(buffer);
+
+ if (header != null) {
+ encoder.writeObject(header);
}
- DeliveryAnnotations da = null;
if (daMap != null) {
- da = new DeliveryAnnotations(daMap);
+ encoder.writeObject(new DeliveryAnnotations(daMap));
+ }
+ if (maMap != null) {
+ encoder.writeObject(new MessageAnnotations(maMap));
+ }
+ if (properties != null) {
+ encoder.writeObject(properties);
}
- ApplicationProperties ap = null;
if (apMap != null) {
- ap = new ApplicationProperties(apMap);
+ encoder.writeObject(new ApplicationProperties(apMap));
+ }
+ if (body != null) {
+ encoder.writeObject(body);
}
- Footer footer = null;
if (footerMap != null) {
- footer = new Footer(footerMap);
+ encoder.writeObject(new Footer(footerMap));
}
- return (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(header, da, ma, props, ap, body, footer);
+ return new EncodedMessage(messageFormat, buffer.getArray(), 0, buffer.getArrayLength());
}
- private Section convertBody(Message message) throws JMSException {
+ private Section convertBody(ActiveMQMessage message) throws JMSException {
Section body = null;
short orignalEncoding = AMQP_UNKNOWN;
- if (message.propertyExists(AMQP_ORIGINAL_ENCODING_KEY)) {
- try {
- orignalEncoding = message.getShortProperty(AMQP_ORIGINAL_ENCODING_KEY);
- } catch (Exception ex) {
- }
+ try {
+ orignalEncoding = message.getShortProperty(JMS_AMQP_ORIGINAL_ENCODING);
+ } catch (Exception ex) {
+ // Ignore and stick with UNKNOWN
}
- if (message instanceof BytesMessage) {
- Binary payload = vendor.getBinaryFromMessageBody((BytesMessage) message);
+ if (message instanceof ActiveMQBytesMessage) {
+ Binary payload = getBinaryFromMessageBody((ActiveMQBytesMessage) message);
if (payload == null) {
payload = EMPTY_BINARY;
@@ -317,12 +391,12 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
body = new Data(payload);
break;
}
- } else if (message instanceof TextMessage) {
+ } else if (message instanceof ActiveMQTextMessage) {
switch (orignalEncoding) {
case AMQP_NULL:
break;
case AMQP_DATA:
- body = new Data(vendor.getBinaryFromMessageBody((TextMessage) message));
+ body = new Data(getBinaryFromMessageBody((ActiveMQTextMessage) message));
break;
case AMQP_VALUE_STRING:
case AMQP_UNKNOWN:
@@ -330,11 +404,11 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
body = new AmqpValue(((TextMessage) message).getText());
break;
}
- } else if (message instanceof MapMessage) {
- body = new AmqpValue(vendor.getMapFromMessageBody((MapMessage) message));
- } else if (message instanceof StreamMessage) {
+ } else if (message instanceof ActiveMQMapMessage) {
+ body = new AmqpValue(getMapFromMessageBody((ActiveMQMapMessage) message));
+ } else if (message instanceof ActiveMQStreamMessage) {
ArrayList<Object> list = new ArrayList<Object>();
- final StreamMessage m = (StreamMessage) message;
+ final ActiveMQStreamMessage m = (ActiveMQStreamMessage) message;
try {
while (true) {
list.add(m.readObject());
@@ -352,8 +426,8 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
body = new AmqpValue(list);
break;
}
- } else if (message instanceof ObjectMessage) {
- Binary payload = vendor.getBinaryFromMessageBody((ObjectMessage) message);
+ } else if (message instanceof ActiveMQObjectMessage) {
+ Binary payload = getBinaryFromMessageBody((ActiveMQObjectMessage) message);
if (payload == null) {
payload = EMPTY_BINARY;
@@ -373,8 +447,10 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
// For a non-AMQP message we tag the outbound content type as containing
// a serialized Java object so that an AMQP client has a hint as to what
// we are sending it.
- if (!message.propertyExists(contentTypeKey)) {
- vendor.setMessageProperty(message, contentTypeKey, SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+ if (!message.propertyExists(JMS_AMQP_CONTENT_TYPE)) {
+ message.setReadOnlyProperties(false);
+ message.setStringProperty(JMS_AMQP_CONTENT_TYPE, SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+ message.setReadOnlyProperties(true);
}
}
@@ -399,23 +475,19 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer.");
}
- // Used by legacy QPid AMQP 1.0 JMS client.
- @Deprecated
- private static String destinationAttributes(Destination destination) {
- if (destination instanceof Queue) {
- if (destination instanceof TemporaryQueue) {
- return LEGACY_TEMP_QUEUE_TYPE;
- } else {
- return LEGACY_QUEUE_TYPE;
- }
- } else if (destination instanceof Topic) {
- if (destination instanceof TemporaryTopic) {
- return LEGACY_TEMP_TOPIC_TYPE;
- } else {
- return LEGACY_TOPIC_TYPE;
+ private static Object getOriginalMessageId(ActiveMQMessage message) {
+ Object result;
+ MessageId messageId = message.getMessageId();
+ if (messageId.getTextView() != null) {
+ try {
+ result = AMQPMessageIdHelper.INSTANCE.toIdObject(messageId.getTextView());
+ } catch (AmqpProtocolException e) {
+ result = messageId.getTextView();
}
+ } else {
+ result = messageId.toString();
}
- throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer.");
+ return result;
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
index 2eefa50..6ca9ced 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
@@ -16,54 +16,10 @@
*/
package org.apache.activemq.transport.amqp.message;
-import javax.jms.Message;
+import org.apache.activemq.command.ActiveMQMessage;
-public abstract class OutboundTransformer {
+public interface OutboundTransformer {
- protected final ActiveMQJMSVendor vendor;
+ public abstract EncodedMessage transform(ActiveMQMessage message) throws Exception;
- protected String prefixVendor;
-
- protected String prefixDeliveryAnnotations = "DA_";
- protected String prefixMessageAnnotations= "MA_";
- protected String prefixFooter = "FT_";
-
- protected String messageFormatKey;
- protected String nativeKey;
- protected String firstAcquirerKey;
- protected String prefixDeliveryAnnotationsKey;
- protected String prefixMessageAnnotationsKey;
- protected String contentTypeKey;
- protected String contentEncodingKey;
- protected String replyToGroupIDKey;
- protected String prefixFooterKey;
-
- public OutboundTransformer(ActiveMQJMSVendor vendor) {
- this.vendor = vendor;
- this.setPrefixVendor("JMS_AMQP_");
- }
-
- public abstract EncodedMessage transform(Message jms) throws Exception;
-
- public String getPrefixVendor() {
- return prefixVendor;
- }
-
- public void setPrefixVendor(String prefixVendor) {
- this.prefixVendor = prefixVendor;
-
- messageFormatKey = prefixVendor + "MESSAGE_FORMAT";
- nativeKey = prefixVendor + "NATIVE";
- firstAcquirerKey = prefixVendor + "FirstAcquirer";
- prefixDeliveryAnnotationsKey = prefixVendor + prefixDeliveryAnnotations;
- prefixMessageAnnotationsKey = prefixVendor + prefixMessageAnnotations;
- contentTypeKey = prefixVendor + "ContentType";
- contentEncodingKey = prefixVendor + "ContentEncoding";
- replyToGroupIDKey = prefixVendor + "ReplyToGroupID";
- prefixFooterKey = prefixVendor + prefixFooter;
- }
-
- public ActiveMQJMSVendor getVendor() {
- return vendor;
- }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
index 503a05e..33c319e 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
@@ -37,7 +37,6 @@ import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
import org.apache.activemq.transport.amqp.ResponseHandler;
import org.apache.activemq.transport.amqp.message.AMQPNativeInboundTransformer;
import org.apache.activemq.transport.amqp.message.AMQPRawInboundTransformer;
-import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor;
import org.apache.activemq.transport.amqp.message.EncodedMessage;
import org.apache.activemq.transport.amqp.message.InboundTransformer;
import org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer;
@@ -138,14 +137,14 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
if (inboundTransformer == null) {
String transformer = session.getConnection().getConfiguredTransformer();
if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_JMS)) {
- inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+ inboundTransformer = new JMSMappingInboundTransformer();
} else if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_NATIVE)) {
- inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+ inboundTransformer = new AMQPNativeInboundTransformer();
} else if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_RAW)) {
- inboundTransformer = new AMQPRawInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+ inboundTransformer = new AMQPRawInboundTransformer();
} else {
LOG.warn("Unknown transformer type {} using native one instead", transformer);
- inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+ inboundTransformer = new AMQPNativeInboundTransformer();
}
}
return inboundTransformer;
@@ -157,7 +156,7 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), deliveryBytes.data, deliveryBytes.offset, deliveryBytes.length);
InboundTransformer transformer = getTransformer();
- ActiveMQMessage message = (ActiveMQMessage) transformer.transform(em);
+ ActiveMQMessage message = transformer.transform(em);
current = null;
http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 455e0b0..2531c1a 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -17,6 +17,7 @@
package org.apache.activemq.transport.amqp.protocol;
import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_FORMAT;
import java.io.IOException;
import java.util.LinkedList;
@@ -39,7 +40,6 @@ import org.apache.activemq.command.Response;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
import org.apache.activemq.transport.amqp.ResponseHandler;
-import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor;
import org.apache.activemq.transport.amqp.message.AutoOutboundTransformer;
import org.apache.activemq.transport.amqp.message.EncodedMessage;
import org.apache.activemq.transport.amqp.message.OutboundTransformer;
@@ -75,11 +75,10 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
- private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
+ private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer();
private final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator();
private final LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>();
private final LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>();
- private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT";
private final ConsumerInfo consumerInfo;
private AbstractSubscription subscription;
@@ -437,8 +436,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
temp = (ActiveMQMessage) md.getMessage();
}
- if (!temp.getProperties().containsKey(MESSAGE_FORMAT_KEY)) {
- temp.setProperty(MESSAGE_FORMAT_KEY, 0);
+ if (!temp.getProperties().containsKey(JMS_AMQP_MESSAGE_FORMAT)) {
+ temp.setProperty(JMS_AMQP_MESSAGE_FORMAT, 0);
}
}
@@ -477,6 +476,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
currentDelivery = getEndpoint().delivery(tag, 0, tag.length);
}
currentDelivery.setContext(md);
+ currentDelivery.setMessageFormat((int) amqp.getMessageFormat());
} else {
// TODO: message could not be generated what now?
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
index b513c1a..201cee2 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
@@ -87,8 +87,6 @@ public class AmqpTransformerTest {
assertTrue(message instanceof BytesMessage);
Boolean nativeTransformationUsed = message.getBooleanProperty("JMS_AMQP_NATIVE");
- Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT");
- assertEquals(0L, messageFormat.longValue());
assertTrue("Didn't use the correct transformation, expected NATIVE", nativeTransformationUsed);
assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
assertEquals(7, message.getJMSPriority());
@@ -136,8 +134,6 @@ public class AmqpTransformerTest {
LOG.info("Recieved message: {}", message);
assertTrue(message instanceof BytesMessage);
Boolean nativeTransformationUsed = message.getBooleanProperty("JMS_AMQP_NATIVE");
- Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT");
- assertEquals(0L, messageFormat.longValue());
assertTrue("Didn't use the correct transformation, expected NATIVE", nativeTransformationUsed);
assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
@@ -184,8 +180,6 @@ public class AmqpTransformerTest {
assertTrue(message instanceof TextMessage);
Boolean nativeTransformationUsed = message.getBooleanProperty("JMS_AMQP_NATIVE");
- Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT");
- assertEquals(0L, messageFormat.longValue());
assertFalse("Didn't use the correct transformation, expected NOT to be NATIVE", nativeTransformationUsed);
assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
index 84d5864..fa61e14 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
+import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
@@ -468,4 +469,46 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
amqp.close();
openwire.close();
}
+
+ //----- Test Qpid JMS to Qpid JMS interop with transformers --------------//
+
+ @Test
+ public void testQpidJMSToQpidJMSMessageSendReceive() throws Exception {
+ final int SIZE = 1024;
+ final int NUM_MESSAGES = 100;
+
+ Connection amqpSend = createConnection("client-1");
+ Connection amqpReceive = createConnection("client-2");
+
+ amqpReceive.start();
+
+ Session senderSession = amqpSend.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session receiverSession = amqpReceive.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination queue = senderSession.createQueue(getDestinationName());
+
+ MessageProducer amqpProducer = senderSession.createProducer(queue);
+ MessageConsumer amqpConsumer = receiverSession.createConsumer(queue);
+
+ byte[] payload = new byte[SIZE];
+
+ for (int i = 0; i < NUM_MESSAGES; ++i) {
+ BytesMessage outgoing = senderSession.createBytesMessage();
+ outgoing.setLongProperty("SendTime", System.currentTimeMillis());
+ outgoing.writeBytes(payload);
+ amqpProducer.send(outgoing);
+ }
+
+ // Now consumer the message
+ for (int i = 0; i < NUM_MESSAGES; ++i) {
+ Message received = amqpConsumer.receive(2000);
+ assertNotNull(received);
+ assertTrue("Expected BytesMessage but got " + received, received instanceof BytesMessage);
+ BytesMessage incoming = (BytesMessage) received;
+ assertEquals(SIZE, incoming.getBodyLength());
+ }
+
+ amqpReceive.close();
+ amqpSend.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 2b1b874..e5e1bbd 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -257,7 +257,7 @@ public class AmqpMessage {
* @return the set message ID in String form or null if not set.
*/
public String getMessageId() {
- if (message.getProperties() == null) {
+ if (message.getProperties() == null || message.getProperties().getMessageId() == null) {
return null;
}
@@ -309,7 +309,7 @@ public class AmqpMessage {
* @return the set correlation ID in String form or null if not set.
*/
public String getCorrelationId() {
- if (message.getProperties() == null) {
+ if (message.getProperties() == null || message.getProperties().getCorrelationId() == null) {
return null;
}
@@ -387,7 +387,7 @@ public class AmqpMessage {
* @return true if the message is marked as being durable.
*/
public boolean isDurable() {
- if (message.getHeader() == null) {
+ if (message.getHeader() == null || message.getHeader().getDurable() == null) {
return false;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
index ba0f014..1427b5a 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
@@ -51,7 +51,6 @@ import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.message.Message;
import org.junit.Test;
-import org.mockito.Mockito;
public class JMSMappingInboundTransformerTest {
@@ -65,8 +64,7 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateBytesMessageFromNoBodySectionAndContentType() throws Exception {
- ActiveMQJMSVendor vendor = createVendor();
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+ JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
Message message = Message.Factory.create();
message.setContentType(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE);
@@ -86,8 +84,7 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateBytesMessageFromNoBodySectionAndNoContentType() throws Exception {
- ActiveMQJMSVendor vendor = createVendor();
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+ JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
Message message = Message.Factory.create();
@@ -107,8 +104,7 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateObjectMessageFromNoBodySectionAndContentType() throws Exception {
- ActiveMQJMSVendor vendor = createVendor();
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+ JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
Message message = Message.Factory.create();
message.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
@@ -122,8 +118,7 @@ public class JMSMappingInboundTransformerTest {
@Test
public void testCreateTextMessageFromNoBodySectionAndContentType() throws Exception {
- ActiveMQJMSVendor vendor = createVendor();
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+ JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
Message message = Message.Factory.create();
message.setContentType("text/plain");
@@ -143,8 +138,7 @@ public class JMSMappingInboundTransformerTest {
* @throws Exception if an error occurs during the test.
*/
public void testCreateGenericMessageFromNoBodySectionAndUnknownContentType() throws Exception {
- ActiveMQJMSVendor vendor = createVendor();
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+ JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
Message message = Message.Factory.create();
message.setContentType("unknown-content-type");
@@ -174,8 +168,7 @@ public class JMSMappingInboundTransformerTest {
EncodedMessage em = encodeMessage(message);
- ActiveMQJMSVendor vendor = createVendor();
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+ JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
@@ -197,8 +190,7 @@ public class JMSMappingInboundTransformerTest {
EncodedMessage em = encodeMessage(message);
- ActiveMQJMSVendor vendor = createVendor();
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+ JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
@@ -222,8 +214,7 @@ public class JMSMappingInboundTransformerTest {
EncodedMessage em = encodeMessage(message);
- ActiveMQJMSVendor vendor = createVendor();
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+ JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
@@ -246,8 +237,7 @@ public class JMSMappingInboundTransformerTest {
EncodedMessage em = encodeMessage(message);
- ActiveMQJMSVendor vendor = createVendor();
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+ JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
@@ -350,8 +340,7 @@ public class JMSMappingInboundTransformerTest {
EncodedMessage em = encodeMessage(message);
- ActiveMQJMSVendor vendor = createVendor();
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+ JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
@@ -377,8 +366,7 @@ public class JMSMappingInboundTransformerTest {
EncodedMessage em = encodeMessage(message);
- ActiveMQJMSVendor vendor = createVendor();
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+ JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
@@ -398,8 +386,7 @@ public class JMSMappingInboundTransformerTest {
EncodedMessage em = encodeMessage(message);
- ActiveMQJMSVendor vendor = createVendor();
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+ JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
@@ -415,8 +402,7 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateObjectMessageFromAmqpValueWithBinaryAndContentType() throws Exception {
- ActiveMQJMSVendor vendor = createVendor();
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+ JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
Message message = Message.Factory.create();
message.setBody(new AmqpValue(new Binary(new byte[0])));
@@ -443,8 +429,7 @@ public class JMSMappingInboundTransformerTest {
EncodedMessage em = encodeMessage(message);
- ActiveMQJMSVendor vendor = createVendor();
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+ JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
@@ -465,8 +450,7 @@ public class JMSMappingInboundTransformerTest {
EncodedMessage em = encodeMessage(message);
- ActiveMQJMSVendor vendor = createVendor();
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+ JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
@@ -487,8 +471,7 @@ public class JMSMappingInboundTransformerTest {
EncodedMessage em = encodeMessage(message);
- ActiveMQJMSVendor vendor = createVendor();
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+ JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
@@ -509,8 +492,7 @@ public class JMSMappingInboundTransformerTest {
EncodedMessage em = encodeMessage(message);
- ActiveMQJMSVendor vendor = createVendor();
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+ JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
@@ -531,8 +513,7 @@ public class JMSMappingInboundTransformerTest {
EncodedMessage em = encodeMessage(message);
- ActiveMQJMSVendor vendor = createVendor();
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+ JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
javax.jms.Message jmsMessage = transformer.transform(em);
@@ -548,8 +529,7 @@ public class JMSMappingInboundTransformerTest {
EncodedMessage em = encodeMessage(message);
- ActiveMQJMSVendor vendor = createVendor();
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
+ JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
javax.jms.Message jmsMessage = transformer.transform(em);
assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
@@ -589,9 +569,7 @@ public class JMSMappingInboundTransformerTest {
}
private void doTransformWithToTypeDestinationTypeAnnotationTestImpl(Object toTypeAnnotationValue, Class<? extends Destination> expectedClass) throws Exception {
- ActiveMQTextMessage mockTextMessage = createMockTextMessage();
- ActiveMQJMSVendor mockVendor = createMockVendor(mockTextMessage);
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor);
+ JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
String toAddress = "toAddress";
Message amqp = Message.Factory.create();
@@ -608,11 +586,6 @@ public class JMSMappingInboundTransformerTest {
javax.jms.Message jmsMessage = transformer.transform(em);
assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
-
- // Verify that createDestination was called with the provided 'to'
- // address and 'Destination' class
- // TODO - No need to really test this bit ?
- // Mockito.verify(mockVendor).createDestination(toAddress, expectedClass);
}
//----- ReplyTo Conversions ----------------------------------------------//
@@ -643,9 +616,7 @@ public class JMSMappingInboundTransformerTest {
}
private void doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(Object replyToTypeAnnotationValue, Class<? extends Destination> expectedClass) throws Exception {
- ActiveMQTextMessage mockTextMessage = createMockTextMessage();
- ActiveMQJMSVendor mockVendor = createMockVendor(mockTextMessage);
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor);
+ JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
String replyToAddress = "replyToAddress";
Message amqp = Message.Factory.create();
@@ -662,31 +633,10 @@ public class JMSMappingInboundTransformerTest {
javax.jms.Message jmsMessage = transformer.transform(em);
assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
-
- // Verify that createDestination was called with the provided 'replyTo'
- // address and 'Destination' class
- // TODO - No need to really test this bit ?
- // Mockito.verify(mockVendor).createDestination(replyToAddress, expectedClass);
}
//----- Utility Methods --------------------------------------------------//
- private ActiveMQTextMessage createMockTextMessage() {
- return Mockito.mock(ActiveMQTextMessage.class);
- }
-
- private ActiveMQJMSVendor createMockVendor(ActiveMQTextMessage mockTextMessage) {
- ActiveMQJMSVendor mockVendor = Mockito.mock(ActiveMQJMSVendor.class);
- Mockito.when(mockVendor.createTextMessage()).thenReturn(mockTextMessage);
- Mockito.when(mockVendor.createTextMessage(Mockito.any(String.class))).thenReturn(mockTextMessage);
-
- return mockVendor;
- }
-
- private ActiveMQJMSVendor createVendor() {
- return ActiveMQJMSVendor.INSTANCE;
- }
-
private EncodedMessage encodeMessage(Message message) {
byte[] encodeBuffer = new byte[1024 * 8];
int encodedSize;