You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2012/10/25 15:36:10 UTC

svn commit: r1402148 - in /activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp: ./ transform/

Author: dejanb
Date: Thu Oct 25 13:36:09 2012
New Revision: 1402148

URL: http://svn.apache.org/viewvc?rev=1402148&view=rev
Log:
amqp - support configurable transformers and populate message properties for the default native one

Added:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPRawInboundTransformer.java
      - copied, changed from r1401957, activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java
Modified:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/EncodedMessage.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1402148&r1=1402147&r2=1402148&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Thu Oct 25 13:36:09 2012
@@ -363,8 +363,24 @@ class AmqpProtocolConverter {
         }
     }
 
-    //InboundTransformer inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
-    InboundTransformer inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+    InboundTransformer inboundTransformer;
+
+    protected InboundTransformer getInboundTransformer()  {
+        if (inboundTransformer == null) {
+            String transformer = amqpTransport.getTransformer();
+            if (transformer.equals(InboundTransformer.TRANSFORMER_JMS)) {
+                inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+            } else if (transformer.equals(InboundTransformer.TRANSFORMER_NATIVE)) {
+                inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+            } else if (transformer.equals(InboundTransformer.TRANSFORMER_RAW)) {
+                inboundTransformer = new AMQPRawInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+            } else {
+                LOG.warn("Unknown transformer type " + transformer + ", using native one instead");
+                inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+            }
+        }
+        return inboundTransformer;
+    }
 
     abstract class BaseProducerContext extends AmqpDeliveryListener {
 
@@ -419,7 +435,7 @@ class AmqpProtocolConverter {
         @Override
         protected void onMessage(Receiver receiver, Delivery delivery, Buffer buffer) throws Exception {
             EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
-            final ActiveMQMessage message = (ActiveMQMessage) inboundTransformer.transform(em);
+            final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em);
             current = null;
 
             if( message.getDestination()==null ) {
@@ -587,7 +603,7 @@ class AmqpProtocolConverter {
 
     private Source createSource(ActiveMQDestination dest) {
         org.apache.qpid.proton.type.messaging.Source rc = new org.apache.qpid.proton.type.messaging.Source();
-        rc.setAddress(inboundTransformer.getVendor().toAddress(dest));
+        rc.setAddress(getInboundTransformer().getVendor().toAddress(dest));
         return rc;
     }
 

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java?rev=1402148&r1=1402147&r2=1402148&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java Thu Oct 25 13:36:09 2012
@@ -40,4 +40,6 @@ public interface AmqpTransport {
     public AmqpWireFormat getWireFormat();
 
     public void stop() throws Exception;
+
+    public String getTransformer();
 }

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java?rev=1402148&r1=1402147&r2=1402148&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java Thu Oct 25 13:36:09 2012
@@ -21,6 +21,7 @@ import org.apache.activemq.command.Comma
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFilter;
 import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.amqp.transform.InboundTransformer;
 import org.apache.activemq.transport.tcp.SslTransport;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.WireFormat;
@@ -46,6 +47,7 @@ public class AmqpTransportFilter extends
     private AmqpWireFormat wireFormat;
 
     private boolean trace;
+    private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
 
     public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
         super(next);
@@ -161,4 +163,11 @@ public class AmqpTransportFilter extends
     }
 
 
+    public String getTransformer() {
+        return transformer;
+    }
+
+    public void setTransformer(String transformer) {
+        this.transformer = transformer;
+    }
 }

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java?rev=1402148&r1=1402147&r2=1402148&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java Thu Oct 25 13:36:09 2012
@@ -22,7 +22,7 @@ import javax.jms.Message;
 /**
 * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
 */
-public class AMQPNativeInboundTransformer extends InboundTransformer {
+public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
 
 
     public AMQPNativeInboundTransformer(JMSVendor vendor) {
@@ -31,21 +31,11 @@ public class AMQPNativeInboundTransforme
 
     @Override
     public Message transform(EncodedMessage amqpMessage) throws Exception {
+        org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
 
-        BytesMessage rc = vendor.createBytesMessage();
-        rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
+        Message rc = super.transform(amqpMessage);
 
-        rc.setJMSDeliveryMode(defaultDeliveryMode);
-        rc.setJMSPriority(defaultPriority);
-
-        final long now = System.currentTimeMillis();
-        rc.setJMSTimestamp(now);
-        if( defaultTtl > 0 ) {
-            rc.setJMSExpiration(now + defaultTtl);
-        }
-
-        rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
-        rc.setBooleanProperty(prefixVendor + "NATIVE", true);
+        populateMessage(rc, amqp);
         return rc;
     }
 }

Copied: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPRawInboundTransformer.java (from r1401957, activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPRawInboundTransformer.java?p2=activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPRawInboundTransformer.java&p1=activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java&r1=1401957&r2=1402148&rev=1402148&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPRawInboundTransformer.java Thu Oct 25 13:36:09 2012
@@ -19,19 +19,14 @@ package org.apache.activemq.transport.am
 import javax.jms.BytesMessage;
 import javax.jms.Message;
 
-/**
-* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
-*/
-public class AMQPNativeInboundTransformer extends InboundTransformer {
+public class AMQPRawInboundTransformer extends InboundTransformer {
 
-
-    public AMQPNativeInboundTransformer(JMSVendor vendor) {
+    public AMQPRawInboundTransformer(JMSVendor vendor) {
         super(vendor);
     }
 
     @Override
     public Message transform(EncodedMessage amqpMessage) throws Exception {
-
         BytesMessage rc = vendor.createBytesMessage();
         rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
 
@@ -46,6 +41,7 @@ public class AMQPNativeInboundTransforme
 
         rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
         rc.setBooleanProperty(prefixVendor + "NATIVE", true);
+
         return rc;
     }
 }

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/EncodedMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/EncodedMessage.java?rev=1402148&r1=1402147&r2=1402148&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/EncodedMessage.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/EncodedMessage.java Thu Oct 25 13:36:09 2012
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.transport.amqp.transform;
 
+import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.type.Binary;
 
 /**
@@ -33,4 +34,19 @@ public class EncodedMessage extends Bina
     public long getMessageFormat() {
         return messageFormat;
     }
+
+    public Message decode() throws Exception {
+        Message amqp = new Message();
+
+        int offset = getArrayOffset();
+        int len = getLength();
+        while( len > 0 ) {
+            final int decoded = amqp.decode(getArray(), offset, len);
+            assert decoded > 0: "Make progress decoding the message";
+            offset += decoded;
+            len -= decoded;
+        }
+
+        return amqp;
+    }
 }

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java?rev=1402148&r1=1402147&r2=1402148&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java Thu Oct 25 13:36:09 2012
@@ -17,9 +17,20 @@
 package org.apache.activemq.transport.amqp.transform;
 
 import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.type.Binary;
+import org.apache.qpid.proton.type.messaging.ApplicationProperties;
+import org.apache.qpid.proton.type.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.type.messaging.Footer;
+import org.apache.qpid.proton.type.messaging.Header;
+import org.apache.qpid.proton.type.messaging.MessageAnnotations;
+import org.apache.qpid.proton.type.messaging.Properties;
 
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
 
 /**
 * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -27,7 +38,16 @@ import java.io.IOException;
 public abstract class InboundTransformer {
 
     JMSVendor vendor;
+
+    public static final String TRANSFORMER_NATIVE = "native";
+    public static final String TRANSFORMER_RAW = "raw";
+    public static final String TRANSFORMER_JMS = "jms";
+
     String prefixVendor = "JMS_AMQP_";
+    String prefixDeliveryAnnotations = "DA_";
+    String prefixMessageAnnotations= "MA_";
+    String prefixFooter = "FT_";
+
     int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
     int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
     long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
@@ -77,4 +97,111 @@ public abstract class InboundTransformer
     public void setVendor(JMSVendor vendor) {
         this.vendor = vendor;
     }
+
+    protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
+        final Header header = amqp.getHeader();
+        if( header!=null ) {
+            if( header.getDurable()!=null ) {
+                jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+            }
+            if( header.getPriority()!=null ) {
+                jms.setJMSPriority(header.getPriority().intValue());
+            }
+            if( header.getTtl()!=null ) {
+                jms.setJMSExpiration(header.getTtl().longValue());
+            }
+            if( header.getFirstAcquirer() !=null ) {
+                jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
+            }
+            if( header.getDeliveryCount()!=null ) {
+                vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue());
+            }
+        }
+
+        final DeliveryAnnotations da = amqp.getDeliveryAnnotations();
+        if( da!=null ) {
+            for (Map.Entry entry : (Set<Map.Entry>)da.getValue().entrySet()) {
+                String key = entry.getKey().toString();
+                setProperty(jms, prefixVendor + prefixDeliveryAnnotations + key, entry.getValue());
+            }
+        }
+
+        final MessageAnnotations ma = amqp.getMessageAnnotations();
+        if( ma!=null ) {
+            for (Map.Entry entry : (Set<Map.Entry>)ma.getValue().entrySet()) {
+                String key = entry.getKey().toString();
+                setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
+            }
+        }
+
+        final Properties properties = amqp.getProperties();
+        if( properties!=null ) {
+            if( properties.getMessageId()!=null ) {
+                jms.setJMSMessageID(properties.getMessageId().toString());
+            }
+            Binary userId = properties.getUserId();
+            if( userId!=null ) {
+                vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8"));
+            }
+            if( properties.getTo()!=null ) {
+                jms.setJMSDestination(vendor.createDestination(properties.getTo()));
+            }
+            if( properties.getSubject()!=null ) {
+                jms.setStringProperty(prefixVendor + "Subject", properties.getSubject());
+            }
+            if( properties.getReplyTo() !=null ) {
+                jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo()));
+            }
+            if( properties.getCorrelationId() !=null ) {
+                jms.setJMSCorrelationID(properties.getCorrelationId().toString());
+            }
+            if( properties.getContentType() !=null ) {
+                jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString());
+            }
+            if( properties.getContentEncoding() !=null ) {
+                jms.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString());
+            }
+            if( properties.getCreationTime()!=null ) {
+                jms.setJMSTimestamp(properties.getCreationTime().getTime());
+            }
+            if( properties.getGroupId()!=null ) {
+                vendor.setJMSXGroupID(jms, properties.getGroupId());
+            }
+            if( properties.getGroupSequence()!=null ) {
+                vendor.setJMSXGroupSequence(jms, properties.getGroupSequence().intValue());
+            }
+            if( properties.getReplyToGroupId()!=null ) {
+                jms.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId());
+            }
+        }
+
+        final ApplicationProperties ap = amqp.getApplicationProperties();
+        if( ap !=null ) {
+            for (Map.Entry entry : (Set<Map.Entry>)ap.getValue().entrySet()) {
+                String key = entry.getKey().toString();
+                setProperty(jms, key, entry.getValue());
+            }
+        }
+
+        final Footer fp = amqp.getFooter();
+        if( fp !=null ) {
+            for (Map.Entry entry : (Set<Map.Entry>)fp.getValue().entrySet()) {
+                String key = entry.getKey().toString();
+                setProperty(jms, prefixVendor + prefixFooter + key, entry.getValue());
+            }
+        }
+    }
+
+    private void setProperty(Message msg, String key, Object value) throws JMSException {
+        //TODO support all types
+        if( value instanceof String ) {
+            msg.setStringProperty(key, (String) value);
+        } else if( value instanceof Integer ) {
+            msg.setIntProperty(key, ((Integer) value).intValue());
+        } else if( value instanceof Long ) {
+            msg.setLongProperty(key, ((Long) value).longValue());
+        } else {
+            throw new RuntimeException("Unexpected value type: "+value.getClass());
+        }
+    }
 }

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java?rev=1402148&r1=1402147&r2=1402148&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java Thu Oct 25 13:36:09 2012
@@ -30,26 +30,13 @@ import java.util.Set;
 */
 public class JMSMappingInboundTransformer extends InboundTransformer {
 
-    String prefixDeliveryAnnotations = "DA_";
-    String prefixMessageAnnotations= "MA_";
-    String prefixFooter = "FT_";
-
     public JMSMappingInboundTransformer(JMSVendor vendor) {
         super(vendor);
     }
 
     @Override
     public Message transform(EncodedMessage amqpMessage) throws Exception {
-        org.apache.qpid.proton.message.Message amqp = new org.apache.qpid.proton.message.Message();
-
-        int offset = amqpMessage.getArrayOffset();
-        int len = amqpMessage.getLength();
-        while( len > 0 ) {
-            final int decoded = amqp.decode(amqpMessage.getArray(), offset, len);
-            assert decoded > 0: "Make progress decoding the message";
-            offset += decoded;
-            len -= decoded;
-        }
+        org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
 
         Message rc;
         final Section body = amqp.getBody();
@@ -105,113 +92,10 @@ public class JMSMappingInboundTransforme
         rc.setJMSPriority(defaultPriority);
         rc.setJMSExpiration(defaultTtl);
 
-        final Header header = amqp.getHeader();
-        if( header!=null ) {
-            if( header.getDurable()!=null ) {
-                rc.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-            }
-            if( header.getPriority()!=null ) {
-                rc.setJMSPriority(header.getPriority().intValue());
-            }
-            if( header.getTtl()!=null ) {
-                rc.setJMSExpiration(header.getTtl().longValue());
-            }
-            if( header.getFirstAcquirer() !=null ) {
-                rc.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
-            }
-            if( header.getDeliveryCount()!=null ) {
-                vendor.setJMSXDeliveryCount(rc, header.getDeliveryCount().longValue());
-            }
-        }
-
-        final DeliveryAnnotations da = amqp.getDeliveryAnnotations();
-        if( da!=null ) {
-            for (Map.Entry entry : (Set<Map.Entry>)da.getValue().entrySet()) {
-                String key = entry.getKey().toString();
-                setProperty(rc, prefixVendor + prefixDeliveryAnnotations + key, entry.getValue());
-            }
-        }
-
-        final MessageAnnotations ma = amqp.getMessageAnnotations();
-        if( ma!=null ) {
-            for (Map.Entry entry : (Set<Map.Entry>)ma.getValue().entrySet()) {
-                String key = entry.getKey().toString();
-                setProperty(rc, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
-            }
-        }
-
-        final Properties properties = amqp.getProperties();
-        if( properties!=null ) {
-            if( properties.getMessageId()!=null ) {
-                rc.setJMSMessageID(properties.getMessageId().toString());
-            }
-            Binary userId = properties.getUserId();
-            if( userId!=null ) {
-                vendor.setJMSXUserID(rc, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8"));
-            }
-            if( properties.getTo()!=null ) {
-                rc.setJMSDestination(vendor.createDestination(properties.getTo()));
-            }
-            if( properties.getSubject()!=null ) {
-                rc.setStringProperty(prefixVendor + "Subject", properties.getSubject());
-            }
-            if( properties.getReplyTo() !=null ) {
-                rc.setJMSReplyTo(vendor.createDestination(properties.getReplyTo()));
-            }
-            if( properties.getCorrelationId() !=null ) {
-                rc.setJMSCorrelationID(properties.getCorrelationId().toString());
-            }
-            if( properties.getContentType() !=null ) {
-                rc.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString());
-            }
-            if( properties.getContentEncoding() !=null ) {
-                rc.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString());
-            }
-            if( properties.getCreationTime()!=null ) {
-                rc.setJMSTimestamp(properties.getCreationTime().getTime());
-            }
-            if( properties.getGroupId()!=null ) {
-                vendor.setJMSXGroupID(rc, properties.getGroupId());
-            }
-            if( properties.getGroupSequence()!=null ) {
-                vendor.setJMSXGroupSequence(rc, properties.getGroupSequence().intValue());
-            }
-            if( properties.getReplyToGroupId()!=null ) {
-                rc.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId());
-            }
-        }
-
-        final ApplicationProperties ap = amqp.getApplicationProperties();
-        if( ap !=null ) {
-            for (Map.Entry entry : (Set<Map.Entry>)ap.getValue().entrySet()) {
-                String key = entry.getKey().toString();
-                setProperty(rc, key, entry.getValue());
-            }
-        }
-
-        final Footer fp = amqp.getFooter();
-        if( fp !=null ) {
-            for (Map.Entry entry : (Set<Map.Entry>)fp.getValue().entrySet()) {
-                String key = entry.getKey().toString();
-                setProperty(rc, prefixVendor + prefixFooter + key, entry.getValue());
-            }
-        }
+        populateMessage(rc, amqp);
 
         rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
         rc.setBooleanProperty(prefixVendor + "NATIVE", false);
         return rc;
     }
-
-    private void setProperty(Message msg, String key, Object value) throws JMSException {
-        //TODO support all types
-        if( value instanceof String ) {
-            msg.setStringProperty(key, (String) value);
-        } else if( value instanceof Integer ) {
-            msg.setIntProperty(key, ((Integer) value).intValue());
-        } else if( value instanceof Long ) {
-            msg.setLongProperty(key, ((Long) value).longValue());
-        } else {
-            throw new RuntimeException("Unexpected value type: "+value.getClass());
-        }
-    }
 }