You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/03 23:46:47 UTC

svn commit: r1393782 - in /activemq/trunk: ./ activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/ activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/

Author: chirino
Date: Wed Oct  3 21:46:46 2012
New Revision: 1393782

URL: http://svn.apache.org/viewvc?rev=1393782&view=rev
Log:
Making more progress on the AMQP implementation.

Added:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java
      - copied, changed from r1393500, activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingOutboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSVendor.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java
      - copied, changed from r1393500, activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
      - copied, changed from r1393500, activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java
Removed:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSubscription.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/JMSMappingInboundTransformer.java
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java
Modified:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java
    activemq/trunk/pom.xml

Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java?rev=1393782&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java Wed Oct  3 21:46:46 2012
@@ -0,0 +1,67 @@
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.command.*;
+import org.apache.activemq.transport.amqp.transform.JMSVendor;
+
+import javax.jms.*;
+import javax.jms.Message;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ActiveMQJMSVendor extends JMSVendor {
+
+    final public static ActiveMQJMSVendor INSTANCE = new ActiveMQJMSVendor();
+
+    private ActiveMQJMSVendor() {}
+
+    @Override
+    public BytesMessage createBytesMessage() {
+        return new ActiveMQBytesMessage();
+    }
+
+    @Override
+    public StreamMessage createStreamMessage() {
+        return new ActiveMQStreamMessage();
+    }
+
+    @Override
+    public Message createMessage() {
+        return new ActiveMQMessage();
+    }
+
+    @Override
+    public TextMessage createTextMessage() {
+        return new ActiveMQTextMessage();
+    }
+
+    @Override
+    public ObjectMessage createObjectMessage() {
+        return new ActiveMQObjectMessage();
+    }
+
+    @Override
+    public MapMessage createMapMessage() {
+        return new ActiveMQMapMessage();
+    }
+
+    @Override
+    public Destination createDestination(String name) {
+        return ActiveMQDestination.createDestination(name, ActiveMQDestination.QUEUE_TYPE);
+    }
+
+    @Override
+    public void setJMSXUserID(Message msg, String value) {
+        ((ActiveMQMessage)msg).setUserID(value);
+    }
+
+    @Override
+    public void setJMSXGroupID(Message msg, String value) {
+        ((ActiveMQMessage)msg).setGroupID(value);
+    }
+
+    @Override
+    public void setJMSXGroupSequence(Message msg, int value) {
+        ((ActiveMQMessage)msg).setGroupSequence(value);
+    }
+}

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1393782&r1=1393781&r2=1393782&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Wed Oct  3 21:46:46 2012
@@ -18,26 +18,23 @@ package org.apache.activemq.transport.am
 
 import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.command.*;
-import org.apache.activemq.command.Message;
+import org.apache.activemq.transport.amqp.transform.*;
 import org.apache.activemq.util.*;
 import org.apache.qpid.proton.engine.*;
 import org.apache.qpid.proton.engine.Session;
 import org.apache.qpid.proton.engine.impl.ConnectionImpl;
-import org.apache.qpid.proton.engine.impl.DeliveryImpl;
 import org.apache.qpid.proton.engine.impl.TransportImpl;
-import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.jms.*;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.zip.Inflater;
+
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
 
 class AmqpProtocolConverter {
 
@@ -80,7 +77,7 @@ class AmqpProtocolConverter {
         this.protonTransport.bind(this.protonConnection);
     }
 
-    void pumpOut() {
+    void pumpProtonToSocket() {
         try {
             int size = 1024 * 64;
             byte data[] = new byte[size];
@@ -158,11 +155,7 @@ class AmqpProtocolConverter {
 
             link = protonConnection.linkHead(ACTIVE_STATE, CLOSED_STATE);
             while (link != null) {
-                if (link instanceof Receiver) {
-//                    listener.onReceiverClose((Receiver) link);
-                } else {
-//                    listener.onSenderClose((Sender) link);
-                }
+                ((AmqpDeliveryListener)link.getContext()).onClose();
                 link.close();
                 link = link.next(ACTIVE_STATE, CLOSED_STATE);
             }
@@ -170,8 +163,7 @@ class AmqpProtocolConverter {
             session = protonConnection.sessionHead(ACTIVE_STATE, CLOSED_STATE);
             while (session != null) {
                 //TODO - close links?
-//                listener.onSessionClose(session);
-                session.close();
+                onSessionClose(session);
                 session = session.next(ACTIVE_STATE, CLOSED_STATE);
             }
             if (protonConnection.getLocalState() == EndpointState.ACTIVE && protonConnection.getRemoteState() == EndpointState.CLOSED) {
@@ -183,7 +175,7 @@ class AmqpProtocolConverter {
             handleException(new AmqpProtocolException("Could not process AMQP commands", true, e));
         }
 
-        pumpOut();
+        pumpProtonToSocket();
     }
 
     public void onActiveMQCommand(Command command) throws Exception {
@@ -223,6 +215,7 @@ class AmqpProtocolConverter {
 
     static abstract class AmqpDeliveryListener {
         abstract public void onDelivery(Delivery delivery) throws Exception;
+        public void onClose() throws Exception {}
     }
 
     private void onConnectionOpen() throws AmqpProtocolException {
@@ -255,14 +248,14 @@ class AmqpProtocolConverter {
             public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
 
                 protonConnection.open();
-                pumpOut();
+                pumpProtonToSocket();
 
                 if (response.isException()) {
                     Throwable exception = ((ExceptionResponse) response).getException();
 // TODO: figure out how to close /w an error.
 //                    protonConnection.setLocalError(new EndpointError(exception.getClass().getName(), exception.getMessage()));
                     protonConnection.close();
-                    pumpOut();
+                    pumpProtonToSocket();
                     amqpTransport.onException(IOExceptionSupport.create(exception));
                     return;
                 }
@@ -278,6 +271,12 @@ class AmqpProtocolConverter {
         session.open();
     }
 
+    private void onSessionClose(Session session) {
+        AmqpSessionContext sessionContext = (AmqpSessionContext)session.getContext();
+        sendToActiveMQ(new RemoveInfo(sessionContext.sessionId), null);
+        session.close();
+    }
+
     private void onLinkOpen(Link link) {
         link.setLocalSourceAddress(link.getRemoteSourceAddress());
         link.setLocalTargetAddress(link.getRemoteTargetAddress());
@@ -290,54 +289,54 @@ class AmqpProtocolConverter {
         }
     }
 
+    InboundTransformer inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+
     class ProducerContext extends AmqpDeliveryListener {
         private final ProducerId producerId;
         private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
         private final ActiveMQDestination destination;
+        ByteArrayOutputStream current = new ByteArrayOutputStream();
 
         public ProducerContext(ProducerId producerId, ActiveMQDestination destination) {
             this.producerId = producerId;
             this.destination = destination;
         }
 
+
         @Override
-        public void onDelivery(Delivery delivery) throws JMSException {
-//            delivery.
-            ActiveMQMessage message = convertMessage((DeliveryImpl) delivery);
+        public void onDelivery(Delivery delivery) throws Exception {
+            if( current ==null ) {
+                current = new ByteArrayOutputStream();
+            }
+
+            Receiver receiver = ((Receiver)delivery.getLink());
+            int count;
+            byte data[] = new byte[1024*4];
+            while( (count = receiver.recv(data, 0, data.length)) > 0 ) {
+                current.write(data, 0, count);
+            }
+
+            // Expecting more deliveries..
+            if( count == 0 ) {
+                return;
+            }
+
+            final Buffer buffer = current.toBuffer();
+            final ActiveMQMessage message = (ActiveMQMessage) inboundTransformer.transform(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
+            current = null;
+
+            if( message.getDestination()==null ) {
+                message.setJMSDestination(destination);
+            }
             message.setProducerId(producerId);
+            if( message.getMessageId()==null ) {
+                message.setMessageId(new MessageId(producerId, messageIdGenerator.getNextSequenceId()));
+            }
             message.onSend();
 //            sendToActiveMQ(message, createResponseHandler(command));
             sendToActiveMQ(message, null);
         }
 
-        ActiveMQMessage convertMessage(DeliveryImpl delivery) throws JMSException {
-            ActiveMQBytesMessage msg = nextMessage(delivery);
-            final Receiver receiver = (Receiver) delivery.getLink();
-            byte buff[] = new byte[1024 * 4];
-            int count = 0;
-            while ((count = receiver.recv(buff, 0, buff.length)) >= 0) {
-                msg.writeBytes(buff, 0, count);
-            }
-            return msg;
-        }
-
-        ActiveMQBytesMessage current;
-
-        private ActiveMQBytesMessage nextMessage(DeliveryImpl delivery) throws JMSException {
-            if (current == null) {
-                current = new ActiveMQBytesMessage();
-                current.setJMSDestination(destination);
-                current.setProducerId(producerId);
-                current.setMessageId(new MessageId(producerId, messageIdGenerator.getNextSequenceId()));
-                current.setTimestamp(System.currentTimeMillis());
-                current.setPriority((byte) javax.jms.Message.DEFAULT_PRIORITY);
-//            msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE);
-//            msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
-                System.out.println(delivery.getLocalState() + "/" + delivery.getRemoteState());
-            }
-            return current;
-        }
-
     }
 
 
@@ -345,7 +344,7 @@ class AmqpProtocolConverter {
         // Client is producing to this receiver object
 
         ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
-        ActiveMQDestination destination = ActiveMQDestination.createDestination(receiver.getRemoteSourceAddress(), ActiveMQDestination.QUEUE_TYPE);
+        ActiveMQDestination destination = ActiveMQDestination.createDestination(receiver.getRemoteTargetAddress(), ActiveMQDestination.QUEUE_TYPE);
         ProducerContext producerContext = new ProducerContext(producerId, destination);
 
         receiver.setContext(producerContext);
@@ -360,12 +359,13 @@ class AmqpProtocolConverter {
                     Throwable exception = ((ExceptionResponse) response).getException();
                     receiver.close();
                 }
-                pumpOut();
+                pumpProtonToSocket();
             }
         });
 
     }
 
+    OutboundTransformer outboundTransformer = new AMQPNativeOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
 
     class ConsumerContext extends AmqpDeliveryListener {
         private final ConsumerId consumerId;
@@ -395,83 +395,66 @@ class AmqpProtocolConverter {
             this.sender = sender;
         }
 
-        // called when the connection receives a JMS message from ActiveMQ
-        public void onMessageDispatch(MessageDispatch md) throws Exception {
-            final byte[] tag = nextTag();
-            final Delivery delivery = sender.delivery(tag, 0, tag.length);
-            delivery.setContext(md);
+        @Override
+        public void onClose() throws Exception {
+            sendToActiveMQ(new RemoveInfo(consumerId), null);
+        }
 
-            // Covert to an AMQP messages.
-            org.apache.qpid.proton.message.Message msg = convertMessage(md.getMessage());
-            byte buffer[] = new byte[1024*4];
-            int c=0;
+        LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>();
 
-            // And send the AMQP message over the link.
-            while( (c=msg.encode(buffer, 0 , 0)) >= 0 ) {
-                sender.send(buffer, 0, c);
-            }
-            sender.advance();
+        // called when the connection receives a JMS message from ActiveMQ
+        public void onMessageDispatch(MessageDispatch md) throws Exception {
+            outbound.addLast(md);
+            pumpOutbound();
+            pumpProtonToSocket();
+        }
+
+        Buffer current;
+
+        public void pumpOutbound() {
+            while(true) {
+
+                while( current!=null ) {
+                    int sent = sender.send(current.data, current.offset, current.length);
+                    if( sent > 0 ) {
+                        current.moveHead(sent);
+                        if( current.length == 0 ) {
+                            sender.advance();
+                            current = null;
+                        }
+                    } else {
+                        return;
+                    }
+                }
 
-        }
+                if( outbound.isEmpty() ) {
+                    return;
+                }
 
-        public org.apache.qpid.proton.message.Message convertMessage(Message message) throws Exception {
-//            result.setContentEncoding();
-//            QoS qoS;
-//            if (message.propertyExists(QOS_PROPERTY_NAME)) {
-//                int ordinal = message.getIntProperty(QOS_PROPERTY_NAME);
-//                qoS = QoS.values()[ordinal];
-//
-//            } else {
-//                qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE;
-//            }
-//            result.qos(qoS);
+                final MessageDispatch md = outbound.removeFirst();
+                final byte[] tag = nextTag();
+                final Delivery delivery = sender.delivery(tag, 0, tag.length);
+                delivery.setContext(md);
 
-            Buffer content = null;
-            if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
-                ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
-                msg.setReadOnlyBody(true);
-                String messageText = msg.getText();
-                content = new Buffer(messageText.getBytes("UTF-8"));
-            } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
-                ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy();
-                msg.setReadOnlyBody(true);
-                byte[] data = new byte[(int) msg.getBodyLength()];
-                msg.readBytes(data);
-                content = new Buffer(data);
-            } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
-                ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
-                msg.setReadOnlyBody(true);
-                Map map = msg.getContentMap();
-                content = new Buffer(map.toString().getBytes("UTF-8"));
-            } else {
-                ByteSequence byteSequence = message.getContent();
-                if (byteSequence != null && byteSequence.getLength() > 0) {
-                    if (message.isCompressed()) {
-                        Inflater inflater = new Inflater();
-                        inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length);
-                        byte[] data = new byte[4096];
-                        int read;
-                        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
-                        while ((read = inflater.inflate(data)) != 0) {
-                            bytesOut.write(data, 0, read);
-                        }
-                        byteSequence = bytesOut.toByteSequence();
+                try {
+                    final ActiveMQMessage jms = (ActiveMQMessage) md.getMessage();
+                    final byte[] amqpMessage = outboundTransformer.transform(jms);
+                    if( amqpMessage!=null && amqpMessage.length > 0 ) {
+                        current = new Buffer(amqpMessage);
+                    } else {
+                        // TODO: message could not be generated what now?
                     }
-                    content = new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length);
-                } else {
-                    content = new Buffer(0);
+                } catch (Exception e) {
+                    e.printStackTrace();
                 }
             }
-
-            org.apache.qpid.proton.message.Message result = new org.apache.qpid.proton.message.Message();
-            return result;
         }
 
-
         @Override
         public void onDelivery(Delivery delivery) throws JMSException {
             if( delivery.remotelySettled() ) {
                 MessageDispatch md = (MessageDispatch) delivery.getContext();
+                pumpOutbound();
             }
         }
 
@@ -501,38 +484,12 @@ class AmqpProtocolConverter {
                     Throwable exception = ((ExceptionResponse) response).getException();
                     sender.close();
                 }
-                pumpOut();
+                pumpProtonToSocket();
             }
         });
 
     }
 
-//
-//    QoS onSubscribe(SUBSCRIBE command, Topic topic) throws AmqpProtocolException {
-//        ActiveMQDestination destination = new ActiveMQTopic(convertAMQPToActiveMQ(topic.name().toString()));
-//        if (destination == null) {
-//            throw new AmqpProtocolException("Invalid Destination.");
-//        }
-//
-//        ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
-//        ConsumerInfo consumerInfo = new ConsumerInfo(id);
-//        consumerInfo.setDestination(destination);
-//        consumerInfo.setPrefetchSize(1000);
-//        consumerInfo.setDispatchAsync(true);
-//        if (!connect.cleanSession() && (connect.clientId() != null)) {
-//            //by default subscribers are persistent
-//            consumerInfo.setSubscriptionName(connect.clientId().toString());
-//        }
-//
-//        AmqpSubscription amqpSubscription = new AmqpSubscription(this, topic.qos(), consumerInfo);
-//
-//
-//        amqpSubscriptionByTopic.put(topic.name(), amqpSubscription);
-//
-//        sendToActiveMQ(consumerInfo, null);
-//        return topic.qos();
-//    }
-//
 //    void onUnSubscribe(UNSUBSCRIBE command) {
 //        UTF8Buffer[] topics = command.topics();
 //        if (topics != null) {

Copied: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java (from r1393500, activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java?p2=activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java&p1=activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java&r1=1393500&r2=1393782&rev=1393782&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java Wed Oct  3 21:46:46 2012
@@ -14,11 +14,38 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.amqp;
+package org.apache.activemq.transport.amqp.transform;
+
+import javax.jms.BytesMessage;
+import javax.jms.Message;
 
 /**
 * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
 */
 public class AMQPNativeInboundTransformer extends InboundTransformer {
 
+
+    public AMQPNativeInboundTransformer(JMSVendor vendor) {
+        super(vendor);
+    }
+
+    @Override
+    public Message transform(long messageFormat, byte [] amqpMessage, int offset, int len) throws Exception {
+
+        BytesMessage rc = vendor.createBytesMessage();
+        rc.writeBytes(amqpMessage, offset, len);
+
+        rc.setJMSDeliveryMode(defaultDeliveryMode);
+        rc.setJMSPriority(defaultPriority);
+
+        final long now = System.currentTimeMillis();
+        rc.setJMSTimestamp(now);
+        if( defaultTtl > 0 ) {
+            rc.setJMSExpiration(now + defaultTtl);
+        }
+
+        rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", messageFormat);
+        rc.setBooleanProperty(prefixVendor + "NATIVE", false);
+        return rc;
+    }
 }

Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java?rev=1393782&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java Wed Oct  3 21:46:46 2012
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.transform;
+
+import javax.jms.BytesMessage;
+import javax.jms.Message;
+import javax.jms.MessageFormatException;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public class AMQPNativeOutboundTransformer extends OutboundTransformer {
+
+    public AMQPNativeOutboundTransformer(JMSVendor vendor) {
+        super(vendor);
+    }
+
+    @Override
+    public byte[] transform(Message jms) throws Exception {
+        if( jms == null )
+            return null;
+        if( !(jms instanceof BytesMessage) )
+            return null;
+
+        long messageFormat;
+        try {
+            if( !jms.getBooleanProperty(prefixVendor + "NATIVE") ) {
+                return null;
+            }
+            messageFormat = jms.getLongProperty(prefixVendor + "MESSAGE_FORMAT");
+        } catch (MessageFormatException e) {
+            return null;
+        }
+
+        // TODO: Proton should probably expose a way to set the msg format
+        // delivery.settMessageFormat(messageFormat);
+
+        BytesMessage bytesMessage = (BytesMessage) jms;
+        byte data[] = new byte[(int) bytesMessage.getBodyLength()];
+        bytesMessage.readBytes(data);
+        return data;
+    }
+
+
+}

Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java?rev=1393782&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java Wed Oct  3 21:46:46 2012
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.transform;
+
+import org.apache.qpid.proton.engine.Delivery;
+
+import javax.jms.Message;
+import java.io.IOException;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public abstract class InboundTransformer {
+
+    JMSVendor vendor;
+    String prefixVendor = "JMS_AMQP_";
+    int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
+    int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
+    long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
+
+    public InboundTransformer(JMSVendor vendor) {
+        this.vendor = vendor;
+    }
+
+    abstract public Message transform(long messageFormat, byte [] data, int offset, int len) throws Exception;
+
+    public int getDefaultDeliveryMode() {
+        return defaultDeliveryMode;
+    }
+
+    public void setDefaultDeliveryMode(int defaultDeliveryMode) {
+        this.defaultDeliveryMode = defaultDeliveryMode;
+    }
+
+    public int getDefaultPriority() {
+        return defaultPriority;
+    }
+
+    public void setDefaultPriority(int defaultPriority) {
+        this.defaultPriority = defaultPriority;
+    }
+
+    public long getDefaultTtl() {
+        return defaultTtl;
+    }
+
+    public void setDefaultTtl(long defaultTtl) {
+        this.defaultTtl = defaultTtl;
+    }
+
+    public String getPrefixVendor() {
+        return prefixVendor;
+    }
+
+    public void setPrefixVendor(String prefixVendor) {
+        this.prefixVendor = prefixVendor;
+    }
+
+    public JMSVendor getVendor() {
+        return vendor;
+    }
+
+    public void setVendor(JMSVendor vendor) {
+        this.vendor = vendor;
+    }
+}

Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java?rev=1393782&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java Wed Oct  3 21:46:46 2012
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.transform;
+
+import org.apache.qpid.proton.type.Binary;
+import org.apache.qpid.proton.type.messaging.*;
+
+import javax.jms.*;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public class JMSMappingInboundTransformer extends InboundTransformer {
+
+    String prefixDeliveryAnnotations = "DA_";
+    String prefixMessageAnnotations= "MA_";
+    String prefixFooter = "FT_";
+
+    public JMSMappingInboundTransformer(JMSVendor vendor) {
+        super(vendor);
+    }
+
+    @Override
+    public Message transform(long messageFormat, byte [] amqpMessage, int offset, int len) throws Exception {
+        org.apache.qpid.proton.message.Message amqp = new org.apache.qpid.proton.message.Message();
+
+        while( len > 0 ) {
+            final int decoded = amqp.decode(amqpMessage, offset, len);
+            assert decoded > 0: "Make progress decoding the message";
+            offset += decoded;
+            len -= decoded;
+        }
+
+        Message rc;
+        final Section body = amqp.getBody();
+        if( body instanceof Data ) {
+            Binary d = ((Data) body).getValue();
+            BytesMessage m = vendor.createBytesMessage();
+            m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
+            rc = m;
+        } else if (body instanceof AmqpSequence ) {
+            AmqpSequence sequence = (AmqpSequence) body;
+            StreamMessage m = vendor.createStreamMessage();
+            throw new RuntimeException("not implemented");
+//                jms = m;
+        } else if (body instanceof AmqpValue) {
+            Object value = ((AmqpValue) body).getValue();
+            if( value == null ) {
+                rc = vendor.createMessage();
+            } if( value instanceof String ) {
+                TextMessage m = vendor.createTextMessage();
+                m.setText((String) value);
+                rc = m;
+            } else if( value instanceof Binary ) {
+                Binary d = (Binary) value;
+                BytesMessage m = vendor.createBytesMessage();
+                m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
+                rc = m;
+            } else if( value instanceof List) {
+                List d = (List) value;
+                StreamMessage m = vendor.createStreamMessage();
+                throw new RuntimeException("not implemented");
+//                    jms = m;
+            } else if( value instanceof Map) {
+                Map d = (Map) value;
+                MapMessage m = vendor.createMapMessage();
+                throw new RuntimeException("not implemented");
+//                    jms = m;
+            } else {
+                ObjectMessage m = vendor.createObjectMessage();
+                throw new RuntimeException("not implemented");
+//                    jms = m;
+            }
+        } else {
+            throw new RuntimeException("Unexpected body type.");
+        }
+        rc.setJMSDeliveryMode(defaultDeliveryMode);
+        rc.setJMSPriority(defaultPriority);
+        rc.setJMSExpiration(defaultTtl);
+
+        final Header header = amqp.getHeader();
+        if( header!=null ) {
+            if( header.getDurable()!=null ) {
+                rc.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+            }
+            if( header.getPriority()!=null ) {
+                rc.setJMSPriority(header.getPriority().intValue());
+            }
+            if( header.getTtl()!=null ) {
+                rc.setJMSExpiration(header.getTtl().longValue());
+            }
+            if( header.getFirstAcquirer() !=null ) {
+                rc.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
+            }
+            if( header.getDeliveryCount()!=null ) {
+                rc.setLongProperty("JMSXDeliveryCount", header.getDeliveryCount().longValue());
+            }
+        }
+
+        final DeliveryAnnotations da = amqp.getDeliveryAnnotations();
+        if( da!=null ) {
+            for (Map.Entry entry : (Set<Map.Entry>)da.getValue().entrySet()) {
+                String key = entry.getKey().toString();
+                setProperty(rc, prefixVendor + prefixDeliveryAnnotations + key, entry.getValue());
+            }
+        }
+
+        final MessageAnnotations ma = amqp.getMessageAnnotations();
+        if( ma!=null ) {
+            for (Map.Entry entry : (Set<Map.Entry>)ma.getValue().entrySet()) {
+                String key = entry.getKey().toString();
+                setProperty(rc, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
+            }
+        }
+
+        final Properties properties = amqp.getProperties();
+        if( properties!=null ) {
+            if( properties.getMessageId()!=null ) {
+                rc.setJMSMessageID(properties.getMessageId().toString());
+            }
+            Binary userId = properties.getUserId();
+            if( userId!=null ) {
+                vendor.setJMSXUserID(rc, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8"));
+            }
+            if( properties.getTo()!=null ) {
+                rc.setJMSDestination(vendor.createDestination(properties.getTo()));
+            }
+            if( properties.getSubject()!=null ) {
+                rc.setStringProperty(prefixVendor + "Subject", properties.getSubject());
+            }
+            if( properties.getReplyTo() !=null ) {
+                rc.setJMSReplyTo(vendor.createDestination(properties.getReplyTo()));
+            }
+            if( properties.getCorrelationId() !=null ) {
+                rc.setJMSCorrelationID(properties.getCorrelationId().toString());
+            }
+            if( properties.getContentType() !=null ) {
+                rc.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString());
+            }
+            if( properties.getContentEncoding() !=null ) {
+                rc.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString());
+            }
+            if( properties.getCreationTime()!=null ) {
+                rc.setJMSTimestamp(properties.getCreationTime().getTime());
+            }
+            if( properties.getGroupId()!=null ) {
+                vendor.setJMSXGroupID(rc, properties.getGroupId());
+            }
+            if( properties.getGroupSequence()!=null ) {
+                vendor.setJMSXGroupSequence(rc, properties.getGroupSequence().intValue());
+            }
+            if( properties.getReplyToGroupId()!=null ) {
+                rc.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId());
+            }
+        }
+
+        final ApplicationProperties ap = amqp.getApplicationProperties();
+        if( da!=null ) {
+            for (Map.Entry entry : (Set<Map.Entry>)ap.getValue().entrySet()) {
+                String key = entry.getKey().toString();
+                setProperty(rc, key, entry.getValue());
+            }
+        }
+
+        final Footer fp = amqp.getFooter();
+        if( da!=null ) {
+            for (Map.Entry entry : (Set<Map.Entry>)fp.getValue().entrySet()) {
+                String key = entry.getKey().toString();
+                setProperty(rc, prefixVendor + prefixFooter + key, entry.getValue());
+            }
+        }
+
+        rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", messageFormat);
+        rc.setBooleanProperty(prefixVendor + "NATIVE", false);
+        return rc;
+    }
+
+    private void setProperty(Message msg, String key, Object value) throws JMSException {
+        if( value instanceof String ) {
+            msg.setStringProperty(key, (String) value);
+//        } else if( value instanceof Integer ) {
+//            msg.setIntProperty(key, ((Integer) value).intValue());
+//        } else if( value instanceof Long ) {
+//            msg.setLongProperty(key, ((Long) value).longValue());
+        } else {
+            throw new RuntimeException("Unexpected value type: "+value.getClass());
+        }
+    }
+}

Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingOutboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingOutboundTransformer.java?rev=1393782&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingOutboundTransformer.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingOutboundTransformer.java Wed Oct  3 21:46:46 2012
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.transform;
+
+import javax.jms.BytesMessage;
+import javax.jms.Message;
+import javax.jms.MessageFormatException;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public class JMSMappingOutboundTransformer extends OutboundTransformer {
+
+
+    public JMSMappingOutboundTransformer(JMSVendor vendor) {
+        super(vendor);
+    }
+
+    @Override
+    public byte[] transform(Message jms) throws Exception {
+        if( jms == null )
+            return null;
+        if( !(jms instanceof BytesMessage) )
+            return null;
+
+        long messageFormat;
+        try {
+            if( !jms.getBooleanProperty(prefixVendor + "NATIVE") ) {
+                return null;
+            }
+            messageFormat = jms.getLongProperty(prefixVendor + "MESSAGE_FORMAT");
+        } catch (MessageFormatException e) {
+            return null;
+        }
+
+        // TODO: Proton should probably expose a way to set the msg format
+        // delivery.settMessageFormat(messageFormat);
+
+        BytesMessage bytesMessage = (BytesMessage) jms;
+        byte data[] = new byte[(int) bytesMessage.getBodyLength()];
+        bytesMessage.readBytes(data);
+        return data;
+    }
+}

Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSVendor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSVendor.java?rev=1393782&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSVendor.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSVendor.java Wed Oct  3 21:46:46 2012
@@ -0,0 +1,29 @@
+package org.apache.activemq.transport.amqp.transform;
+
+import javax.jms.*;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class JMSVendor {
+
+    public abstract BytesMessage createBytesMessage();
+
+    public abstract StreamMessage createStreamMessage();
+
+    public abstract Message createMessage();
+
+    public abstract TextMessage createTextMessage();
+
+    public abstract ObjectMessage createObjectMessage();
+
+    public abstract MapMessage createMapMessage();
+
+    public abstract void setJMSXUserID(Message jms, String value);
+
+    public abstract Destination createDestination(String name);
+
+    public abstract void setJMSXGroupID(Message jms, String groupId);
+
+    public abstract void setJMSXGroupSequence(Message jms, int i);
+}

Copied: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java (from r1393500, activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java?p2=activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java&p1=activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java&r1=1393500&r2=1393782&rev=1393782&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java Wed Oct  3 21:46:46 2012
@@ -14,16 +14,39 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.amqp;
+package org.apache.activemq.transport.amqp.transform;
+
+import org.apache.qpid.proton.engine.Delivery;
+
+import javax.jms.Message;
 
 /**
 * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
 */
-public class InboundTransformer {
+public abstract class OutboundTransformer {
 
+    JMSVendor vendor;
     String prefixVendor = "JMS_AMQP_";
-    int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
-    int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
-    long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
 
+    public OutboundTransformer(JMSVendor vendor) {
+        this.vendor = vendor;
+    }
+
+    public abstract byte[] transform(Message jms) throws Exception;
+
+    public String getPrefixVendor() {
+        return prefixVendor;
+    }
+
+    public void setPrefixVendor(String prefixVendor) {
+        this.prefixVendor = prefixVendor;
+    }
+
+    public JMSVendor getVendor() {
+        return vendor;
+    }
+
+    public void setVendor(JMSVendor vendor) {
+        this.vendor = vendor;
+    }
 }

Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java?rev=1393782&r1=1393781&r2=1393782&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java Wed Oct  3 21:46:46 2012
@@ -18,7 +18,7 @@ package org.apache.activemq.transport.am
 
 import org.apache.activemq.broker.BrokerService;
 
-public class AmqpNioTest extends AmqpTest {
+public class AmqpNioTest extends AmqpTestSupport {
     protected void addAMQPConnector(BrokerService brokerService) throws Exception {
         brokerService.addConnector("amqp+nio://localhost:1883?maxInactivityDuration=-1");
     }

Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java?rev=1393782&r1=1393781&r2=1393782&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java Wed Oct  3 21:46:46 2012
@@ -28,7 +28,7 @@ import java.security.cert.CertificateExc
 import java.security.cert.X509Certificate;
 
 @Ignore("hangs atm, needs investigation")
-public class AmqpSslTest extends AmqpTest {
+public class AmqpSslTest extends AmqpTestSupport {
     public void startBroker() throws Exception {
         System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
         System.setProperty("javax.net.ssl.trustStorePassword", "password");

Copied: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java (from r1393500, activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java?p2=activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java&p1=activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java&r1=1393500&r2=1393782&rev=1393782&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java Wed Oct  3 21:46:46 2012
@@ -16,34 +16,25 @@
  */
 package org.apache.activemq.transport.amqp;
 
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
+import junit.framework.TestCase;
 import org.apache.activemq.AutoFailTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.util.ByteSequence;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
 import java.util.Vector;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 
 import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
 
-public class AmqpTest {
-    protected static final Logger LOG = LoggerFactory.getLogger(AmqpTest.class);
+public class AmqpTestSupport {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(AmqpTestSupport.class);
     protected BrokerService brokerService;
     protected Vector<Throwable> exceptions = new Vector<Throwable>();
     protected int numberOfMessages;

Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java?rev=1393782&r1=1393781&r2=1393782&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java Wed Oct  3 21:46:46 2012
@@ -27,7 +27,7 @@ import org.junit.Test;
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class SwiftMQClientTest extends AmqpTest {
+public class SwiftMQClientTest extends AmqpTestSupport {
 
     @Test
     public void testSendReceive() throws Exception {
@@ -64,6 +64,7 @@ public class SwiftMQClientTest extends A
                 p.close();
                 session.close();
             }
+
 //            {
 //                Session session = connection.createSession(10, 10);
 //                Consumer c = session.createConsumer(queue, 100, qos, true, null);

Modified: activemq/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/pom.xml?rev=1393782&r1=1393781&r2=1393782&view=diff
==============================================================================
--- activemq/trunk/pom.xml (original)
+++ activemq/trunk/pom.xml Wed Oct  3 21:46:46 2012
@@ -1373,6 +1373,12 @@
 
   <profiles>
     <profile>
+      <id>unstable</id>
+      <modules>
+        <module>activemq-amqp</module>
+      </modules>
+    </profile>
+    <profile>
       <id>apache-release</id>
       <activation>
         <property>