You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/04 23:00:26 UTC

svn commit: r1394264 - in /activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp: ./ transform/

Author: chirino
Date: Thu Oct  4 21:00:25 2012
New Revision: 1394264

URL: http://svn.apache.org/viewvc?rev=1394264&view=rev
Log:
Improving the AQMP<-->JMS message mapping impl.

Added:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AutoOutboundTransformer.java
      - copied, changed from r1393790, activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/DroppingWritableBuffer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/EncodedMessage.java
      - copied, changed from r1393790, activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java
Modified:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingOutboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSVendor.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java?rev=1394264&r1=1394263&r2=1394264&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java Thu Oct  4 21:00:25 2012
@@ -64,4 +64,14 @@ public class ActiveMQJMSVendor extends J
     public void setJMSXGroupSequence(Message msg, int value) {
         ((ActiveMQMessage)msg).setGroupSequence(value);
     }
+
+    @Override
+    public void setJMSXDeliveryCount(Message msg, long value) {
+        ((ActiveMQMessage)msg).setRedeliveryCounter((int) value);
+    }
+
+    @Override
+    public String toAddress(Destination dest) {
+        return ((ActiveMQDestination)dest).getQualifiedName();
+    }
 }

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1394264&r1=1394263&r2=1394264&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Thu Oct  4 21:00:25 2012
@@ -289,7 +289,7 @@ class AmqpProtocolConverter {
         }
     }
 
-    InboundTransformer inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+    InboundTransformer inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE);
 
     class ProducerContext extends AmqpDeliveryListener {
         private final ProducerId producerId;
@@ -322,7 +322,8 @@ class AmqpProtocolConverter {
             }
 
             final Buffer buffer = current.toBuffer();
-            final ActiveMQMessage message = (ActiveMQMessage) inboundTransformer.transform(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
+            EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
+            final ActiveMQMessage message = (ActiveMQMessage) inboundTransformer.transform(em);
             current = null;
 
             if( message.getDestination()==null ) {
@@ -365,7 +366,7 @@ class AmqpProtocolConverter {
 
     }
 
-    OutboundTransformer outboundTransformer = new AMQPNativeOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
+    OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
 
     class ConsumerContext extends AmqpDeliveryListener {
         private final ConsumerId consumerId;
@@ -432,17 +433,19 @@ class AmqpProtocolConverter {
                 }
 
                 final MessageDispatch md = outbound.removeFirst();
-                final byte[] tag = nextTag();
-                final Delivery delivery = sender.delivery(tag, 0, tag.length);
-                delivery.setContext(md);
-
                 try {
                     final ActiveMQMessage jms = (ActiveMQMessage) md.getMessage();
-                    final byte[] amqpMessage = outboundTransformer.transform(jms);
-                    if( amqpMessage!=null && amqpMessage.length > 0 ) {
-                        current = new Buffer(amqpMessage);
+                    final EncodedMessage amqp = outboundTransformer.transform(jms);
+                    if( amqp!=null && amqp.getLength() > 0 ) {
+
+                        current = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
+                        final byte[] tag = nextTag();
+                        final Delivery delivery = sender.delivery(tag, 0, tag.length);
+                        delivery.setContext(md);
+
                     } else {
                         // TODO: message could not be generated what now?
+
                     }
                 } catch (Exception e) {
                     e.printStackTrace();

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java?rev=1394264&r1=1394263&r2=1394264&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java Thu Oct  4 21:00:25 2012
@@ -30,10 +30,10 @@ public class AMQPNativeInboundTransforme
     }
 
     @Override
-    public Message transform(long messageFormat, byte [] amqpMessage, int offset, int len) throws Exception {
+    public Message transform(EncodedMessage amqpMessage) throws Exception {
 
         BytesMessage rc = vendor.createBytesMessage();
-        rc.writeBytes(amqpMessage, offset, len);
+        rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
 
         rc.setJMSDeliveryMode(defaultDeliveryMode);
         rc.setJMSPriority(defaultPriority);
@@ -44,7 +44,7 @@ public class AMQPNativeInboundTransforme
             rc.setJMSExpiration(now + defaultTtl);
         }
 
-        rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", messageFormat);
+        rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
         rc.setBooleanProperty(prefixVendor + "NATIVE", true);
         return rc;
     }

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java?rev=1394264&r1=1394263&r2=1394264&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java Thu Oct  4 21:00:25 2012
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.amqp.transform;
 
 import javax.jms.BytesMessage;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageFormatException;
 
@@ -30,30 +31,31 @@ public class AMQPNativeOutboundTransform
     }
 
     @Override
-    public byte[] transform(Message jms) throws Exception {
-        if( jms == null )
+    public EncodedMessage transform(Message msg) throws Exception {
+        if( msg == null )
             return null;
-        if( !(jms instanceof BytesMessage) )
+        if( !(msg instanceof BytesMessage) )
             return null;
-
-        long messageFormat;
         try {
-            if( !jms.getBooleanProperty(prefixVendor + "NATIVE") ) {
+            if( !msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
                 return null;
             }
-            messageFormat = jms.getLongProperty(prefixVendor + "MESSAGE_FORMAT");
         } catch (MessageFormatException e) {
             return null;
         }
-
-        // TODO: Proton should probably expose a way to set the msg format
-        // delivery.settMessageFormat(messageFormat);
-
-        BytesMessage bytesMessage = (BytesMessage) jms;
-        byte data[] = new byte[(int) bytesMessage.getBodyLength()];
-        bytesMessage.readBytes(data);
-        return data;
+        return transform(this, (BytesMessage) msg);
     }
 
+    static EncodedMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException {
+        long messageFormat;
+        try {
+            messageFormat = msg.getLongProperty(options.prefixVendor + "MESSAGE_FORMAT");
+        } catch (MessageFormatException e) {
+            return null;
+        }
+        byte data[] = new byte[(int) msg.getBodyLength()];
+        msg.readBytes(data);
+        return new EncodedMessage(messageFormat, data, 0, data.length);
+    }
 
 }

Copied: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AutoOutboundTransformer.java (from r1393790, activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AutoOutboundTransformer.java?p2=activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AutoOutboundTransformer.java&p1=activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java&r1=1393790&r2=1394264&rev=1394264&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AutoOutboundTransformer.java Thu Oct  4 21:00:25 2012
@@ -17,35 +17,32 @@
 package org.apache.activemq.transport.amqp.transform;
 
 import javax.jms.BytesMessage;
+import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.MessageFormatException;
 
 /**
 * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
 */
-public class AMQPNativeInboundTransformer extends InboundTransformer {
+public class AutoOutboundTransformer extends JMSMappingOutboundTransformer {
 
-
-    public AMQPNativeInboundTransformer(JMSVendor vendor) {
+    public AutoOutboundTransformer(JMSVendor vendor) {
         super(vendor);
     }
 
     @Override
-    public Message transform(long messageFormat, byte [] amqpMessage, int offset, int len) throws Exception {
-
-        BytesMessage rc = vendor.createBytesMessage();
-        rc.writeBytes(amqpMessage, offset, len);
-
-        rc.setJMSDeliveryMode(defaultDeliveryMode);
-        rc.setJMSPriority(defaultPriority);
-
-        final long now = System.currentTimeMillis();
-        rc.setJMSTimestamp(now);
-        if( defaultTtl > 0 ) {
-            rc.setJMSExpiration(now + defaultTtl);
+    public EncodedMessage transform(Message msg) throws Exception {
+        if( msg == null )
+            return null;
+        if( msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
+            if( msg instanceof BytesMessage ) {
+                return AMQPNativeOutboundTransformer.transform(this, (BytesMessage)msg);
+            } else {
+                return null;
+            }
+        } else {
+            return JMSMappingOutboundTransformer.transform(this, msg);
         }
-
-        rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", messageFormat);
-        rc.setBooleanProperty(prefixVendor + "NATIVE", true);
-        return rc;
     }
+
 }

Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/DroppingWritableBuffer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/DroppingWritableBuffer.java?rev=1394264&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/DroppingWritableBuffer.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/DroppingWritableBuffer.java Thu Oct  4 21:00:25 2012
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.activemq.transport.amqp.transform;
+
+import org.apache.qpid.proton.codec.WritableBuffer;
+
+import java.nio.ByteBuffer;
+
+public class DroppingWritableBuffer implements WritableBuffer
+{
+    int pos = 0;
+
+    @Override
+    public boolean hasRemaining() {
+        return true;
+    }
+
+    @Override
+    public void put(byte b) {
+        pos += 1;
+    }
+
+    @Override
+    public void putFloat(float f) {
+        pos += 4;
+    }
+
+    @Override
+    public void putDouble(double d) {
+        pos += 8;
+    }
+
+    @Override
+    public void put(byte[] src, int offset, int length) {
+        pos += length;
+    }
+
+    @Override
+    public void putShort(short s) {
+        pos += 2;
+    }
+
+    @Override
+    public void putInt(int i) {
+        pos += 4;
+    }
+
+    @Override
+    public void putLong(long l) {
+        pos += 8;
+    }
+
+    @Override
+    public int remaining() {
+        return Integer.MAX_VALUE - pos;
+    }
+
+    @Override
+    public int position() {
+        return pos;
+    }
+
+    @Override
+    public void position(int position) {
+        pos = position;
+    }
+
+    @Override
+    public void put(ByteBuffer payload) {
+        pos += payload.remaining();
+        payload.position(payload.limit());
+    }
+}

Copied: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/EncodedMessage.java (from r1393790, activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/EncodedMessage.java?p2=activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/EncodedMessage.java&p1=activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java&r1=1393790&r2=1394264&rev=1394264&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/EncodedMessage.java Thu Oct  4 21:00:25 2012
@@ -16,37 +16,21 @@
  */
 package org.apache.activemq.transport.amqp.transform;
 
-import org.apache.qpid.proton.engine.Delivery;
-
-import javax.jms.Message;
+import org.apache.qpid.proton.type.Binary;
 
 /**
-* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
-*/
-public abstract class OutboundTransformer {
-
-    JMSVendor vendor;
-    String prefixVendor = "JMS_AMQP_";
-
-    public OutboundTransformer(JMSVendor vendor) {
-        this.vendor = vendor;
-    }
-
-    public abstract byte[] transform(Message jms) throws Exception;
-
-    public String getPrefixVendor() {
-        return prefixVendor;
-    }
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class EncodedMessage extends Binary {
 
-    public void setPrefixVendor(String prefixVendor) {
-        this.prefixVendor = prefixVendor;
-    }
+    final long messageFormat;
 
-    public JMSVendor getVendor() {
-        return vendor;
+    public EncodedMessage(long messageFormat, byte[] data, int offset, int length) {
+        super(data, offset, length);
+        this.messageFormat = messageFormat;
     }
 
-    public void setVendor(JMSVendor vendor) {
-        this.vendor = vendor;
+    public long getMessageFormat() {
+        return messageFormat;
     }
 }

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java?rev=1394264&r1=1394263&r2=1394264&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java Thu Oct  4 21:00:25 2012
@@ -36,7 +36,7 @@ public abstract class InboundTransformer
         this.vendor = vendor;
     }
 
-    abstract public Message transform(long messageFormat, byte [] data, int offset, int len) throws Exception;
+    abstract public Message transform(EncodedMessage amqpMessage) throws Exception;
 
     public int getDefaultDeliveryMode() {
         return defaultDeliveryMode;

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java?rev=1394264&r1=1394263&r2=1394264&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java Thu Oct  4 21:00:25 2012
@@ -38,11 +38,13 @@ public class JMSMappingInboundTransforme
     }
 
     @Override
-    public Message transform(long messageFormat, byte [] amqpMessage, int offset, int len) throws Exception {
+    public Message transform(EncodedMessage amqpMessage) throws Exception {
         org.apache.qpid.proton.message.Message amqp = new org.apache.qpid.proton.message.Message();
 
+        int offset = amqpMessage.getArrayOffset();
+        int len = amqpMessage.getLength();
         while( len > 0 ) {
-            final int decoded = amqp.decode(amqpMessage, offset, len);
+            final int decoded = amqp.decode(amqpMessage.getArray(), offset, len);
             assert decoded > 0: "Make progress decoding the message";
             offset += decoded;
             len -= decoded;
@@ -110,7 +112,7 @@ public class JMSMappingInboundTransforme
                 rc.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
             }
             if( header.getDeliveryCount()!=null ) {
-                rc.setLongProperty("JMSXDeliveryCount", header.getDeliveryCount().longValue());
+                vendor.setJMSXDeliveryCount(rc, header.getDeliveryCount().longValue());
             }
         }
 
@@ -187,7 +189,7 @@ public class JMSMappingInboundTransforme
             }
         }
 
-        rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", messageFormat);
+        rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
         rc.setBooleanProperty(prefixVendor + "NATIVE", false);
         return rc;
     }

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingOutboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingOutboundTransformer.java?rev=1394264&r1=1394263&r2=1394264&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingOutboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingOutboundTransformer.java Thu Oct  4 21:00:25 2012
@@ -16,43 +16,181 @@
  */
 package org.apache.activemq.transport.amqp.transform;
 
-import javax.jms.BytesMessage;
-import javax.jms.Message;
-import javax.jms.MessageFormatException;
+import org.apache.qpid.proton.codec.CompositeWritableBuffer;
+import org.apache.qpid.proton.codec.WritableBuffer;
+import org.apache.qpid.proton.type.Binary;
+import org.apache.qpid.proton.type.Symbol;
+import org.apache.qpid.proton.type.UnsignedByte;
+import org.apache.qpid.proton.type.UnsignedInteger;
+import org.apache.qpid.proton.type.messaging.*;
+
+import javax.jms.*;
+import java.io.ByteArrayOutputStream;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.HashMap;
 
 /**
 * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
 */
 public class JMSMappingOutboundTransformer extends OutboundTransformer {
 
+    String prefixDeliveryAnnotations = "DA_";
+    String prefixMessageAnnotations= "MA_";
+    String prefixFooter = "FT_";
 
     public JMSMappingOutboundTransformer(JMSVendor vendor) {
         super(vendor);
     }
 
     @Override
-    public byte[] transform(Message jms) throws Exception {
-        if( jms == null )
+    public EncodedMessage transform(Message msg) throws Exception {
+        if( msg == null )
             return null;
-        if( !(jms instanceof BytesMessage) )
+        try {
+            if( msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
+                return null;
+            }
+        } catch (MessageFormatException e) {
             return null;
+        }
+        return transform(this, msg);
+    }
+
+    static EncodedMessage transform(JMSMappingOutboundTransformer options, Message msg) throws JMSException, UnsupportedEncodingException {
+        final JMSVendor vendor = options.vendor;
+
+        final String messageFormatKey = options.prefixVendor + "MESSAGE_FORMAT";
+        final String nativeKey = options.prefixVendor + "NATIVE";
+        final String firstAcquirerKey = options.prefixVendor + "FirstAcquirer";
+        final String prefixDeliveryAnnotationsKey = options.prefixVendor + options.prefixDeliveryAnnotations;
+        final String prefixMessageAnnotationsKey = options.prefixVendor + options.prefixMessageAnnotations;
+        final String subjectKey =  options.prefixVendor +"Subject";
+        final String contentTypeKey = options.prefixVendor +"ContentType";
+        final String contentEncodingKey = options.prefixVendor +"ContentEncoding";
+        final String replyToGroupIDKey = options.prefixVendor +"ReplyToGroupID";
+        final String prefixFooterKey = options.prefixVendor + options.prefixFooter;
 
         long messageFormat;
         try {
-            if( !jms.getBooleanProperty(prefixVendor + "NATIVE") ) {
-                return null;
-            }
-            messageFormat = jms.getLongProperty(prefixVendor + "MESSAGE_FORMAT");
+            messageFormat = msg.getLongProperty(messageFormatKey);
         } catch (MessageFormatException e) {
             return null;
         }
 
-        // TODO: Proton should probably expose a way to set the msg format
-        // delivery.settMessageFormat(messageFormat);
+        Header header = new Header();
+        Properties props=new Properties();
+        HashMap daMap = null;
+        HashMap maMap = null;
+        HashMap apMap = null;
+        Section body=null;
+        HashMap footerMap = null;
+        if( msg instanceof BytesMessage ) {
+            BytesMessage m = (BytesMessage)msg;
+            byte data[] = new byte[(int) m.getBodyLength()];
+            m.readBytes(data);
+            body = new Data(new Binary(data));
+        } if( msg instanceof TextMessage ) {
+            body = new AmqpValue(((TextMessage) msg).getText());
+        } if( msg instanceof MapMessage ) {
+            throw new RuntimeException("Not implemented");
+        } if( msg instanceof StreamMessage ) {
+            throw new RuntimeException("Not implemented");
+        } if( msg instanceof ObjectMessage ) {
+            throw new RuntimeException("Not implemented");
+        }
+
+        header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
+        header.setPriority(new UnsignedByte((byte) msg.getJMSPriority()));
+        if( msg.getJMSExpiration() != 0 ) {
+            header.setTtl(new UnsignedInteger((int) msg.getJMSExpiration()));
+        }
+        if( msg.getJMSType()!=null ) {
+            if( maMap==null ) maMap = new HashMap();
+            maMap.put("x-opt-jms-type", msg.getJMSType());
+        }
+        if( msg.getJMSMessageID()!=null ) {
+            props.setMessageId(msg.getJMSMessageID());
+        }
+        if( msg.getJMSDestination()!=null ) {
+            props.setTo(vendor.toAddress(msg.getJMSDestination()));
+        }
+        if( msg.getJMSReplyTo()!=null ) {
+            props.setReplyTo(vendor.toAddress(msg.getJMSDestination()));
+        }
+        if( msg.getJMSCorrelationID()!=null ) {
+            props.setCorrelationId(msg.getJMSCorrelationID());
+        }
+        if( msg.getJMSExpiration() != 0 ) {
+            props.setAbsoluteExpiryTime(new Date(msg.getJMSExpiration()));
+        }
+        if( msg.getJMSTimestamp()!= 0 ) {
+            props.setCreationTime(new Date(msg.getJMSTimestamp()));
+        }
+
+        final Enumeration keys = msg.getPropertyNames();
+        while (keys.hasMoreElements()) {
+            String key = (String) keys.nextElement();
+            if( key.equals(messageFormatKey) || key.equals(nativeKey)) {
+                // skip..
+            } else if( key.equals(firstAcquirerKey) ) {
+                header.setFirstAcquirer(msg.getBooleanProperty(key));
+            } else if( key.startsWith("JMSXDeliveryCount") ) {
+                header.setDeliveryCount(new UnsignedInteger(msg.getIntProperty(key)));
+            } else if( key.startsWith("JMSXUserID") ) {
+                props.setUserId(new Binary(msg.getStringProperty(key).getBytes("UTF-8")));
+            } else if( key.startsWith("JMSXGroupID") ) {
+                props.setGroupId(msg.getStringProperty(key));
+            } else if( key.startsWith("JMSXGroupSeq") ) {
+                props.setGroupSequence(new UnsignedInteger(msg.getIntProperty(key)));
+            } else if( key.startsWith(prefixDeliveryAnnotationsKey) ) {
+                if( daMap == null ) daMap = new HashMap();
+                String name = key.substring(prefixDeliveryAnnotationsKey.length());
+                daMap.put(name, msg.getObjectProperty(key));
+            } else if( key.startsWith(prefixMessageAnnotationsKey) ) {
+                if( maMap==null ) maMap = new HashMap();
+                String name = key.substring(prefixMessageAnnotationsKey.length());
+                maMap.put(name, msg.getObjectProperty(key));
+            } else if( key.equals(subjectKey) ) {
+                props.setSubject(msg.getStringProperty(key));
+            } else if( key.equals(contentTypeKey) ) {
+                props.setContentType(Symbol.getSymbol(msg.getStringProperty(key)));
+            } else if( key.equals(contentEncodingKey) ) {
+                props.setContentEncoding(Symbol.getSymbol(msg.getStringProperty(key)));
+            } else if( key.equals(replyToGroupIDKey) ) {
+                props.setReplyToGroupId(msg.getStringProperty(key));
+            } else if( key.startsWith(prefixFooterKey) ) {
+                if( footerMap==null ) footerMap = new HashMap();
+                String name = key.substring(prefixFooterKey.length());
+                footerMap.put(name, msg.getObjectProperty(key));
+            } else {
+                if( apMap==null ) apMap = new HashMap();
+                apMap.put(key, msg.getObjectProperty(key));
+            }
+        }
+
+
+        MessageAnnotations ma=null;
+        if( maMap!=null ) ma = new MessageAnnotations(maMap);
+        DeliveryAnnotations da=null;
+        if( daMap!=null ) da = new DeliveryAnnotations(daMap);
+        ApplicationProperties ap=null;
+        if( apMap!=null ) ap = new ApplicationProperties(apMap);
+        Footer footer=null;
+        if( footerMap!=null ) footer = new Footer(footerMap);
+
+        org.apache.qpid.proton.message.Message amqp = new org.apache.qpid.proton.message.Message(header, da, ma, props, ap, body, footer);
+
+        ByteBuffer buffer = ByteBuffer.wrap(new byte[1024*4]);
+        final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
+        int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
+        if( overflow.position() > 0 ) {
+            buffer = ByteBuffer.wrap(new byte[1024*4+overflow.position()]);
+            c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
+        }
 
-        BytesMessage bytesMessage = (BytesMessage) jms;
-        byte data[] = new byte[(int) bytesMessage.getBodyLength()];
-        bytesMessage.readBytes(data);
-        return data;
+        return new EncodedMessage(messageFormat, buffer.array(), 0, c);
     }
 }

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSVendor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSVendor.java?rev=1394264&r1=1394263&r2=1394264&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSVendor.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSVendor.java Thu Oct  4 21:00:25 2012
@@ -19,11 +19,16 @@ abstract public class JMSVendor {
 
     public abstract MapMessage createMapMessage();
 
-    public abstract void setJMSXUserID(Message jms, String value);
+    public abstract void setJMSXUserID(Message msg, String value);
 
     public abstract Destination createDestination(String name);
 
-    public abstract void setJMSXGroupID(Message jms, String groupId);
+    public abstract void setJMSXGroupID(Message msg, String groupId);
+
+    public abstract void setJMSXGroupSequence(Message msg, int i);
+
+    public abstract void setJMSXDeliveryCount(Message rc, long l);
+
+    public abstract String toAddress(Destination msgDestination);
 
-    public abstract void setJMSXGroupSequence(Message jms, int i);
 }

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java?rev=1394264&r1=1394263&r2=1394264&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java Thu Oct  4 21:00:25 2012
@@ -32,7 +32,7 @@ public abstract class OutboundTransforme
         this.vendor = vendor;
     }
 
-    public abstract byte[] transform(Message jms) throws Exception;
+    public abstract EncodedMessage transform(Message jms) throws Exception;
 
     public String getPrefixVendor() {
         return prefixVendor;