You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2012/10/25 15:36:10 UTC
svn commit: r1402148 - in
/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp:
./ transform/
Author: dejanb
Date: Thu Oct 25 13:36:09 2012
New Revision: 1402148
URL: http://svn.apache.org/viewvc?rev=1402148&view=rev
Log:
amqp - support configurable transformers and populate message properties for the default native one
Added:
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPRawInboundTransformer.java
- copied, changed from r1401957, activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java
Modified:
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/EncodedMessage.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1402148&r1=1402147&r2=1402148&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Thu Oct 25 13:36:09 2012
@@ -363,8 +363,24 @@ class AmqpProtocolConverter {
}
}
- //InboundTransformer inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
- InboundTransformer inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+ InboundTransformer inboundTransformer;
+
+ protected InboundTransformer getInboundTransformer() {
+ if (inboundTransformer == null) {
+ String transformer = amqpTransport.getTransformer();
+ if (transformer.equals(InboundTransformer.TRANSFORMER_JMS)) {
+ inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+ } else if (transformer.equals(InboundTransformer.TRANSFORMER_NATIVE)) {
+ inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+ } else if (transformer.equals(InboundTransformer.TRANSFORMER_RAW)) {
+ inboundTransformer = new AMQPRawInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+ } else {
+ LOG.warn("Unknown transformer type " + transformer + ", using native one instead");
+ inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+ }
+ }
+ return inboundTransformer;
+ }
abstract class BaseProducerContext extends AmqpDeliveryListener {
@@ -419,7 +435,7 @@ class AmqpProtocolConverter {
@Override
protected void onMessage(Receiver receiver, Delivery delivery, Buffer buffer) throws Exception {
EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
- final ActiveMQMessage message = (ActiveMQMessage) inboundTransformer.transform(em);
+ final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em);
current = null;
if( message.getDestination()==null ) {
@@ -587,7 +603,7 @@ class AmqpProtocolConverter {
private Source createSource(ActiveMQDestination dest) {
org.apache.qpid.proton.type.messaging.Source rc = new org.apache.qpid.proton.type.messaging.Source();
- rc.setAddress(inboundTransformer.getVendor().toAddress(dest));
+ rc.setAddress(getInboundTransformer().getVendor().toAddress(dest));
return rc;
}
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java?rev=1402148&r1=1402147&r2=1402148&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java Thu Oct 25 13:36:09 2012
@@ -40,4 +40,6 @@ public interface AmqpTransport {
public AmqpWireFormat getWireFormat();
public void stop() throws Exception;
+
+ public String getTransformer();
}
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java?rev=1402148&r1=1402147&r2=1402148&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java Thu Oct 25 13:36:09 2012
@@ -21,6 +21,7 @@ import org.apache.activemq.command.Comma
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.amqp.transform.InboundTransformer;
import org.apache.activemq.transport.tcp.SslTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
@@ -46,6 +47,7 @@ public class AmqpTransportFilter extends
private AmqpWireFormat wireFormat;
private boolean trace;
+ private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
super(next);
@@ -161,4 +163,11 @@ public class AmqpTransportFilter extends
}
+ public String getTransformer() {
+ return transformer;
+ }
+
+ public void setTransformer(String transformer) {
+ this.transformer = transformer;
+ }
}
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java?rev=1402148&r1=1402147&r2=1402148&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java Thu Oct 25 13:36:09 2012
@@ -22,7 +22,7 @@ import javax.jms.Message;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class AMQPNativeInboundTransformer extends InboundTransformer {
+public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
public AMQPNativeInboundTransformer(JMSVendor vendor) {
@@ -31,21 +31,11 @@ public class AMQPNativeInboundTransforme
@Override
public Message transform(EncodedMessage amqpMessage) throws Exception {
+ org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
- BytesMessage rc = vendor.createBytesMessage();
- rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
+ Message rc = super.transform(amqpMessage);
- rc.setJMSDeliveryMode(defaultDeliveryMode);
- rc.setJMSPriority(defaultPriority);
-
- final long now = System.currentTimeMillis();
- rc.setJMSTimestamp(now);
- if( defaultTtl > 0 ) {
- rc.setJMSExpiration(now + defaultTtl);
- }
-
- rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
- rc.setBooleanProperty(prefixVendor + "NATIVE", true);
+ populateMessage(rc, amqp);
return rc;
}
}
Copied: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPRawInboundTransformer.java (from r1401957, activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPRawInboundTransformer.java?p2=activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPRawInboundTransformer.java&p1=activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java&r1=1401957&r2=1402148&rev=1402148&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPRawInboundTransformer.java Thu Oct 25 13:36:09 2012
@@ -19,19 +19,14 @@ package org.apache.activemq.transport.am
import javax.jms.BytesMessage;
import javax.jms.Message;
-/**
-* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
-*/
-public class AMQPNativeInboundTransformer extends InboundTransformer {
+public class AMQPRawInboundTransformer extends InboundTransformer {
-
- public AMQPNativeInboundTransformer(JMSVendor vendor) {
+ public AMQPRawInboundTransformer(JMSVendor vendor) {
super(vendor);
}
@Override
public Message transform(EncodedMessage amqpMessage) throws Exception {
-
BytesMessage rc = vendor.createBytesMessage();
rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
@@ -46,6 +41,7 @@ public class AMQPNativeInboundTransforme
rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
rc.setBooleanProperty(prefixVendor + "NATIVE", true);
+
return rc;
}
}
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/EncodedMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/EncodedMessage.java?rev=1402148&r1=1402147&r2=1402148&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/EncodedMessage.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/EncodedMessage.java Thu Oct 25 13:36:09 2012
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport.amqp.transform;
+import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.type.Binary;
/**
@@ -33,4 +34,19 @@ public class EncodedMessage extends Bina
public long getMessageFormat() {
return messageFormat;
}
+
+ public Message decode() throws Exception {
+ Message amqp = new Message();
+
+ int offset = getArrayOffset();
+ int len = getLength();
+ while( len > 0 ) {
+ final int decoded = amqp.decode(getArray(), offset, len);
+ assert decoded > 0: "Make progress decoding the message";
+ offset += decoded;
+ len -= decoded;
+ }
+
+ return amqp;
+ }
}
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java?rev=1402148&r1=1402147&r2=1402148&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java Thu Oct 25 13:36:09 2012
@@ -17,9 +17,20 @@
package org.apache.activemq.transport.amqp.transform;
import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.type.Binary;
+import org.apache.qpid.proton.type.messaging.ApplicationProperties;
+import org.apache.qpid.proton.type.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.type.messaging.Footer;
+import org.apache.qpid.proton.type.messaging.Header;
+import org.apache.qpid.proton.type.messaging.MessageAnnotations;
+import org.apache.qpid.proton.type.messaging.Properties;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
import javax.jms.Message;
import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -27,7 +38,16 @@ import java.io.IOException;
public abstract class InboundTransformer {
JMSVendor vendor;
+
+ public static final String TRANSFORMER_NATIVE = "native";
+ public static final String TRANSFORMER_RAW = "raw";
+ public static final String TRANSFORMER_JMS = "jms";
+
String prefixVendor = "JMS_AMQP_";
+ String prefixDeliveryAnnotations = "DA_";
+ String prefixMessageAnnotations= "MA_";
+ String prefixFooter = "FT_";
+
int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
@@ -77,4 +97,111 @@ public abstract class InboundTransformer
public void setVendor(JMSVendor vendor) {
this.vendor = vendor;
}
+
+ protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
+ final Header header = amqp.getHeader();
+ if( header!=null ) {
+ if( header.getDurable()!=null ) {
+ jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ }
+ if( header.getPriority()!=null ) {
+ jms.setJMSPriority(header.getPriority().intValue());
+ }
+ if( header.getTtl()!=null ) {
+ jms.setJMSExpiration(header.getTtl().longValue());
+ }
+ if( header.getFirstAcquirer() !=null ) {
+ jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
+ }
+ if( header.getDeliveryCount()!=null ) {
+ vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue());
+ }
+ }
+
+ final DeliveryAnnotations da = amqp.getDeliveryAnnotations();
+ if( da!=null ) {
+ for (Map.Entry entry : (Set<Map.Entry>)da.getValue().entrySet()) {
+ String key = entry.getKey().toString();
+ setProperty(jms, prefixVendor + prefixDeliveryAnnotations + key, entry.getValue());
+ }
+ }
+
+ final MessageAnnotations ma = amqp.getMessageAnnotations();
+ if( ma!=null ) {
+ for (Map.Entry entry : (Set<Map.Entry>)ma.getValue().entrySet()) {
+ String key = entry.getKey().toString();
+ setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
+ }
+ }
+
+ final Properties properties = amqp.getProperties();
+ if( properties!=null ) {
+ if( properties.getMessageId()!=null ) {
+ jms.setJMSMessageID(properties.getMessageId().toString());
+ }
+ Binary userId = properties.getUserId();
+ if( userId!=null ) {
+ vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8"));
+ }
+ if( properties.getTo()!=null ) {
+ jms.setJMSDestination(vendor.createDestination(properties.getTo()));
+ }
+ if( properties.getSubject()!=null ) {
+ jms.setStringProperty(prefixVendor + "Subject", properties.getSubject());
+ }
+ if( properties.getReplyTo() !=null ) {
+ jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo()));
+ }
+ if( properties.getCorrelationId() !=null ) {
+ jms.setJMSCorrelationID(properties.getCorrelationId().toString());
+ }
+ if( properties.getContentType() !=null ) {
+ jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString());
+ }
+ if( properties.getContentEncoding() !=null ) {
+ jms.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString());
+ }
+ if( properties.getCreationTime()!=null ) {
+ jms.setJMSTimestamp(properties.getCreationTime().getTime());
+ }
+ if( properties.getGroupId()!=null ) {
+ vendor.setJMSXGroupID(jms, properties.getGroupId());
+ }
+ if( properties.getGroupSequence()!=null ) {
+ vendor.setJMSXGroupSequence(jms, properties.getGroupSequence().intValue());
+ }
+ if( properties.getReplyToGroupId()!=null ) {
+ jms.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId());
+ }
+ }
+
+ final ApplicationProperties ap = amqp.getApplicationProperties();
+ if( ap !=null ) {
+ for (Map.Entry entry : (Set<Map.Entry>)ap.getValue().entrySet()) {
+ String key = entry.getKey().toString();
+ setProperty(jms, key, entry.getValue());
+ }
+ }
+
+ final Footer fp = amqp.getFooter();
+ if( fp !=null ) {
+ for (Map.Entry entry : (Set<Map.Entry>)fp.getValue().entrySet()) {
+ String key = entry.getKey().toString();
+ setProperty(jms, prefixVendor + prefixFooter + key, entry.getValue());
+ }
+ }
+ }
+
+ private void setProperty(Message msg, String key, Object value) throws JMSException {
+ //TODO support all types
+ if( value instanceof String ) {
+ msg.setStringProperty(key, (String) value);
+ } else if( value instanceof Integer ) {
+ msg.setIntProperty(key, ((Integer) value).intValue());
+ } else if( value instanceof Long ) {
+ msg.setLongProperty(key, ((Long) value).longValue());
+ } else {
+ throw new RuntimeException("Unexpected value type: "+value.getClass());
+ }
+ }
}
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java?rev=1402148&r1=1402147&r2=1402148&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java Thu Oct 25 13:36:09 2012
@@ -30,26 +30,13 @@ import java.util.Set;
*/
public class JMSMappingInboundTransformer extends InboundTransformer {
- String prefixDeliveryAnnotations = "DA_";
- String prefixMessageAnnotations= "MA_";
- String prefixFooter = "FT_";
-
public JMSMappingInboundTransformer(JMSVendor vendor) {
super(vendor);
}
@Override
public Message transform(EncodedMessage amqpMessage) throws Exception {
- org.apache.qpid.proton.message.Message amqp = new org.apache.qpid.proton.message.Message();
-
- int offset = amqpMessage.getArrayOffset();
- int len = amqpMessage.getLength();
- while( len > 0 ) {
- final int decoded = amqp.decode(amqpMessage.getArray(), offset, len);
- assert decoded > 0: "Make progress decoding the message";
- offset += decoded;
- len -= decoded;
- }
+ org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
Message rc;
final Section body = amqp.getBody();
@@ -105,113 +92,10 @@ public class JMSMappingInboundTransforme
rc.setJMSPriority(defaultPriority);
rc.setJMSExpiration(defaultTtl);
- final Header header = amqp.getHeader();
- if( header!=null ) {
- if( header.getDurable()!=null ) {
- rc.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
- }
- if( header.getPriority()!=null ) {
- rc.setJMSPriority(header.getPriority().intValue());
- }
- if( header.getTtl()!=null ) {
- rc.setJMSExpiration(header.getTtl().longValue());
- }
- if( header.getFirstAcquirer() !=null ) {
- rc.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
- }
- if( header.getDeliveryCount()!=null ) {
- vendor.setJMSXDeliveryCount(rc, header.getDeliveryCount().longValue());
- }
- }
-
- final DeliveryAnnotations da = amqp.getDeliveryAnnotations();
- if( da!=null ) {
- for (Map.Entry entry : (Set<Map.Entry>)da.getValue().entrySet()) {
- String key = entry.getKey().toString();
- setProperty(rc, prefixVendor + prefixDeliveryAnnotations + key, entry.getValue());
- }
- }
-
- final MessageAnnotations ma = amqp.getMessageAnnotations();
- if( ma!=null ) {
- for (Map.Entry entry : (Set<Map.Entry>)ma.getValue().entrySet()) {
- String key = entry.getKey().toString();
- setProperty(rc, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
- }
- }
-
- final Properties properties = amqp.getProperties();
- if( properties!=null ) {
- if( properties.getMessageId()!=null ) {
- rc.setJMSMessageID(properties.getMessageId().toString());
- }
- Binary userId = properties.getUserId();
- if( userId!=null ) {
- vendor.setJMSXUserID(rc, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8"));
- }
- if( properties.getTo()!=null ) {
- rc.setJMSDestination(vendor.createDestination(properties.getTo()));
- }
- if( properties.getSubject()!=null ) {
- rc.setStringProperty(prefixVendor + "Subject", properties.getSubject());
- }
- if( properties.getReplyTo() !=null ) {
- rc.setJMSReplyTo(vendor.createDestination(properties.getReplyTo()));
- }
- if( properties.getCorrelationId() !=null ) {
- rc.setJMSCorrelationID(properties.getCorrelationId().toString());
- }
- if( properties.getContentType() !=null ) {
- rc.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString());
- }
- if( properties.getContentEncoding() !=null ) {
- rc.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString());
- }
- if( properties.getCreationTime()!=null ) {
- rc.setJMSTimestamp(properties.getCreationTime().getTime());
- }
- if( properties.getGroupId()!=null ) {
- vendor.setJMSXGroupID(rc, properties.getGroupId());
- }
- if( properties.getGroupSequence()!=null ) {
- vendor.setJMSXGroupSequence(rc, properties.getGroupSequence().intValue());
- }
- if( properties.getReplyToGroupId()!=null ) {
- rc.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId());
- }
- }
-
- final ApplicationProperties ap = amqp.getApplicationProperties();
- if( ap !=null ) {
- for (Map.Entry entry : (Set<Map.Entry>)ap.getValue().entrySet()) {
- String key = entry.getKey().toString();
- setProperty(rc, key, entry.getValue());
- }
- }
-
- final Footer fp = amqp.getFooter();
- if( fp !=null ) {
- for (Map.Entry entry : (Set<Map.Entry>)fp.getValue().entrySet()) {
- String key = entry.getKey().toString();
- setProperty(rc, prefixVendor + prefixFooter + key, entry.getValue());
- }
- }
+ populateMessage(rc, amqp);
rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
rc.setBooleanProperty(prefixVendor + "NATIVE", false);
return rc;
}
-
- private void setProperty(Message msg, String key, Object value) throws JMSException {
- //TODO support all types
- if( value instanceof String ) {
- msg.setStringProperty(key, (String) value);
- } else if( value instanceof Integer ) {
- msg.setIntProperty(key, ((Integer) value).intValue());
- } else if( value instanceof Long ) {
- msg.setLongProperty(key, ((Long) value).longValue());
- } else {
- throw new RuntimeException("Unexpected value type: "+value.getClass());
- }
- }
}