You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/19 16:28:03 UTC

svn commit: r1400116 - in /activemq/trunk/activemq-amqp/src: main/java/org/apache/activemq/transport/amqp/ main/java/org/apache/activemq/transport/amqp/transform/ main/resources/META-INF/services/org/apache/activemq/transport/ test/java/org/apache/acti...

Author: chirino
Date: Fri Oct 19 14:28:02 2012
New Revision: 1400116

URL: http://svn.apache.org/viewvc?rev=1400116&view=rev
Log:
More AMQP impl changes.  More tests pass, initial cut of transaction support is in.

Added:
    activemq/trunk/activemq-amqp/src/test/resources/keystore
Modified:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/ActiveMQAdmin.java
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsTest.java

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1400116&r1=1400115&r2=1400116&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Fri Oct 19 14:28:02 2012
@@ -24,7 +24,18 @@ import org.apache.activemq.util.IdGenera
 import org.apache.activemq.util.LongSequenceGenerator;
 import org.apache.qpid.proton.engine.*;
 import org.apache.qpid.proton.engine.impl.ConnectionImpl;
+import org.apache.qpid.proton.engine.impl.ProtocolTracer;
 import org.apache.qpid.proton.engine.impl.TransportImpl;
+import org.apache.qpid.proton.framing.TransportFrame;
+import org.apache.qpid.proton.type.Binary;
+import org.apache.qpid.proton.type.messaging.*;
+import org.apache.qpid.proton.type.messaging.Modified;
+import org.apache.qpid.proton.type.messaging.Rejected;
+import org.apache.qpid.proton.type.messaging.Released;
+import org.apache.qpid.proton.type.transaction.*;
+import org.apache.qpid.proton.type.transport.DeliveryState;
+import org.apache.qpid.proton.type.transport.SenderSettleMode;
+import org.apache.qpid.proton.type.transport.Source;
 import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtbuf.ByteArrayOutputStream;
 import org.slf4j.Logger;
@@ -43,8 +54,9 @@ class AmqpProtocolConverter {
     public static final EnumSet<EndpointState> INITIALIZED_SET = EnumSet.complementOf(UNINITIALIZED_SET);
     public static final EnumSet<EndpointState> ACTIVE_STATE = EnumSet.of(EndpointState.ACTIVE);
     public static final EnumSet<EndpointState> CLOSED_STATE = EnumSet.of(EndpointState.CLOSED);
+    public static final EnumSet<EndpointState> ALL_STATES = EnumSet.of(EndpointState.CLOSED, EndpointState.ACTIVE, EndpointState.UNINITIALIZED);
     private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class);
-
+    static final public byte[] EMPTY_BYTE_ARRAY = new byte[]{};
     private final AmqpTransport amqpTransport;
 
     public AmqpProtocolConverter(AmqpTransport amqpTransport, BrokerContext brokerContext) {
@@ -78,6 +90,18 @@ class AmqpProtocolConverter {
 
     {
         this.protonTransport.bind(this.protonConnection);
+        this.protonTransport.setProtocolTracer(new ProtocolTracer() {
+            @Override
+            public void receivedFrame(TransportFrame transportFrame) {
+                System.out.println(String.format("RECV: %05d | %s", transportFrame.getChannel(), transportFrame.getBody()));
+            }
+
+            @Override
+            public void sentFrame(TransportFrame transportFrame) {
+                System.out.println(String.format("SENT: %05d | %s", transportFrame.getChannel(), transportFrame.getBody()));
+            }
+        });
+
     }
 
     void pumpProtonToSocket() {
@@ -88,7 +112,8 @@ class AmqpProtocolConverter {
             while (!done) {
                 int count = protonTransport.output(data, 0, size);
                 if (count > 0) {
-                    final Buffer buffer = new Buffer(data, 0, count);
+                    final Buffer buffer;
+                    buffer = new Buffer(data, 0, count);
 //                    System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 "));
                     amqpTransport.sendToAmqp(buffer);
                 } else {
@@ -107,7 +132,7 @@ class AmqpProtocolConverter {
         long nextConsumerId = 0;
 
         public AmqpSessionContext(ConnectionId connectionId, long id) {
-            sessionId = new SessionId(connectionId, -1);
+            sessionId = new SessionId(connectionId, id);
 
         }
     }
@@ -163,6 +188,13 @@ class AmqpProtocolConverter {
                 link = link.next(ACTIVE_STATE, CLOSED_STATE);
             }
 
+            link = protonConnection.linkHead(ACTIVE_STATE, ALL_STATES);
+            while (link != null) {
+                ((AmqpDeliveryListener)link.getContext()).drainCheck();
+                link = link.next(ACTIVE_STATE, CLOSED_STATE);
+            }
+
+
             session = protonConnection.sessionHead(ACTIVE_STATE, CLOSED_STATE);
             while (session != null) {
                 //TODO - close links?
@@ -170,8 +202,7 @@ class AmqpProtocolConverter {
                 session = session.next(ACTIVE_STATE, CLOSED_STATE);
             }
             if (protonConnection.getLocalState() == EndpointState.ACTIVE && protonConnection.getRemoteState() == EndpointState.CLOSED) {
-//                listener.onConnectionClose(protonConnection);
-                protonConnection.close();
+                doClose();
             }
 
         } catch (Throwable e) {
@@ -181,6 +212,35 @@ class AmqpProtocolConverter {
         pumpProtonToSocket();
     }
 
+    boolean closing = false;
+    boolean closedSocket = false;
+
+    private void doClose() {
+        if( !closing ) {
+            closing = true;
+            sendToActiveMQ(new RemoveInfo(connectionId), new ResponseHandler() {
+                @Override
+                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                    protonConnection.close();
+                    if( !closedSocket) {
+                        pumpProtonToSocket();
+                    }
+                }
+            });
+        }
+    }
+
+
+    public void onAMQPException(IOException error) {
+        closedSocket = true;
+        if( !closing) {
+            System.out.println("AMQP client disconnected");
+            error.printStackTrace();
+        } else {
+            doClose();
+        }
+    }
+
     public void onActiveMQCommand(Command command) throws Exception {
         if (command.isResponse()) {
             Response response = (Response) command;
@@ -221,6 +281,7 @@ class AmqpProtocolConverter {
     static abstract class AmqpDeliveryListener {
         abstract public void onDelivery(Delivery delivery) throws Exception;
         public void onClose() throws Exception {}
+        public void drainCheck() {}
     }
 
     private void onConnectionOpen() throws AmqpProtocolException {
@@ -278,13 +339,17 @@ class AmqpProtocolConverter {
 
     private void onSessionClose(Session session) {
         AmqpSessionContext sessionContext = (AmqpSessionContext)session.getContext();
-        sendToActiveMQ(new RemoveInfo(sessionContext.sessionId), null);
+        if( sessionContext!=null ) {
+            System.out.println(sessionContext.sessionId);
+            sendToActiveMQ(new RemoveInfo(sessionContext.sessionId), null);
+            session.setContext(null);
+        }
         session.close();
     }
 
     private void onLinkOpen(Link link) {
-        link.setLocalSourceAddress(link.getRemoteSourceAddress());
-        link.setLocalTargetAddress(link.getRemoteTargetAddress());
+        link.setSource(link.getRemoteSource());
+        link.setTarget(link.getRemoteTarget());
 
         AmqpSessionContext sessionContext = (AmqpSessionContext) link.getSession().getContext();
         if (link instanceof Receiver) {
@@ -296,17 +361,9 @@ class AmqpProtocolConverter {
 
     InboundTransformer inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
 
-    class ProducerContext extends AmqpDeliveryListener {
-        private final ProducerId producerId;
-        private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
-        private final ActiveMQDestination destination;
-        ByteArrayOutputStream current = new ByteArrayOutputStream();
-
-        public ProducerContext(ProducerId producerId, ActiveMQDestination destination) {
-            this.producerId = producerId;
-            this.destination = destination;
-        }
+    abstract class BaseProducerContext extends AmqpDeliveryListener {
 
+        ByteArrayOutputStream current = new ByteArrayOutputStream();
 
         @Override
         public void onDelivery(Delivery delivery) throws Exception {
@@ -336,7 +393,26 @@ class AmqpProtocolConverter {
             receiver.advance();
             delivery.settle();
 
-            final Buffer buffer = current.toBuffer();
+            Buffer buffer = current.toBuffer();
+            current = null;
+            onMessage(receiver, delivery, buffer);
+        }
+
+        abstract protected void onMessage(Receiver receiver, Delivery delivery, Buffer buffer) throws Exception;
+    }
+
+    class ProducerContext extends BaseProducerContext {
+        private final ProducerId producerId;
+        private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
+        private final ActiveMQDestination destination;
+
+        public ProducerContext(ProducerId producerId, ActiveMQDestination destination) {
+            this.producerId = producerId;
+            this.destination = destination;
+        }
+
+        @Override
+        protected void onMessage(Receiver receiver, Delivery delivery, Buffer buffer) throws Exception {
             EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
             final ActiveMQMessage message = (ActiveMQMessage) inboundTransformer.transform(em);
             current = null;
@@ -348,6 +424,14 @@ class AmqpProtocolConverter {
             if( message.getMessageId()==null ) {
                 message.setMessageId(new MessageId(producerId, messageIdGenerator.getNextSequenceId()));
             }
+
+            DeliveryState remoteState = delivery.getRemoteState();
+            if( remoteState!=null && remoteState instanceof TransactionalState) {
+                TransactionalState s = (TransactionalState) remoteState;
+                long txid = toLong(s.getTxnId());
+                message.setTransactionId(new LocalTransactionId(connectionId, txid));
+            }
+
             message.onSend();
 //            sendToActiveMQ(message, createResponseHandler(command));
             sendToActiveMQ(message, null);
@@ -355,37 +439,166 @@ class AmqpProtocolConverter {
 
     }
 
+    long nextTransactionId = 0;
+    class Transaction {
 
-    void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) {
-        // Client is producing to this receiver object
+    }
+    HashMap<Long, Transaction> transactions = new HashMap<Long, Transaction>();
 
-        ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
-        ActiveMQDestination dest = ActiveMQDestination.createDestination(receiver.getRemoteTargetAddress(), ActiveMQDestination.QUEUE_TYPE);
-        ProducerContext producerContext = new ProducerContext(producerId, dest);
-
-        receiver.setContext(producerContext);
-        receiver.flow(1024 * 64);
-        ProducerInfo producerInfo = new ProducerInfo(producerId);
-        producerInfo.setDestination(dest);
-        sendToActiveMQ(producerInfo, new ResponseHandler() {
-            public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
-                receiver.open();
-                if (response.isException()) {
-                    // If the connection attempt fails we close the socket.
-                    Throwable exception = ((ExceptionResponse) response).getException();
-                    receiver.close();
+    public byte[] toBytes(long value) {
+        Buffer buffer = new Buffer(8);
+        buffer.bigEndianEditor().writeLong(value);
+        return buffer.data;
+    }
+
+    private long toLong(Binary value) {
+        Buffer buffer = new Buffer(value.getArray(), value.getArrayOffset(), value.getLength());
+        return buffer.bigEndianEditor().readLong();
+    }
+
+
+    AmqpDeliveryListener coordinatorContext = new BaseProducerContext() {
+        @Override
+        protected void onMessage(Receiver receiver, final Delivery delivery, Buffer buffer) throws Exception {
+
+            org.apache.qpid.proton.message.Message msg = new org.apache.qpid.proton.message.Message();
+
+            int offset = buffer.offset;
+            int len = buffer.length;
+            while( len > 0 ) {
+                final int decoded = msg.decode(buffer.data, offset, len);
+                assert decoded > 0: "Make progress decoding the message";
+                offset += decoded;
+                len -= decoded;
+            }
+
+            Object action = ((AmqpValue)msg.getBody()).getValue();
+            System.out.println("COORDINATOR received: "+action+", ["+buffer+"]");
+            if( action instanceof Declare ) {
+                Declare declare = (Declare) action;
+                if( declare.getGlobalId()!=null ) {
+                    throw new Exception("don't know how to handle a declare /w a set GlobalId");
                 }
-                pumpProtonToSocket();
+
+                long txid = nextTransactionId++;
+                TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), TransactionInfo.BEGIN);
+                sendToActiveMQ(txinfo, null);
+                System.out.println("started transaction "+txid);
+
+                Declared declared = new Declared();
+                declared.setTxnId(new Binary(toBytes(txid)));
+                delivery.disposition(declared);
+                delivery.settle();
+
+            } else if( action instanceof Discharge) {
+                Discharge discharge = (Discharge) action;
+                long txid = toLong(discharge.getTxnId());
+
+                byte operation;
+                if( discharge.getFail() ) {
+                    System.out.println("rollback transaction "+txid);
+                    operation = TransactionInfo.ROLLBACK ;
+                } else {
+                    System.out.println("commit transaction "+txid);
+                    operation = TransactionInfo.COMMIT_ONE_PHASE;
+                }
+                TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), operation);
+                sendToActiveMQ(txinfo, new ResponseHandler() {
+                    @Override
+                    public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                        if( response.isException() ) {
+                            ExceptionResponse er = (ExceptionResponse)response;
+                            Rejected rejected = new Rejected();
+                            ArrayList errors = new ArrayList();
+                            errors.add(er.getException().getMessage());
+                            rejected.setError(errors);
+                            delivery.disposition(rejected);
+                        }
+                        delivery.settle();
+                        pumpProtonToSocket();
+                    }
+                });
+                receiver.advance();
+
+            } else {
+                throw new Exception("Expected coordinator message type: "+action.getClass());
             }
-        });
 
+        }
+
+    };
+
+
+    void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) {
+        // Client is producing to this receiver object
+        org.apache.qpid.proton.type.transport.Target remoteTarget = receiver.getRemoteTarget();
+        if( remoteTarget instanceof Coordinator ) {
+            pumpProtonToSocket();
+            receiver.setContext(coordinatorContext);
+            receiver.flow(1024 * 64);
+            receiver.open();
+            pumpProtonToSocket();
+        } else {
+            ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
+            ActiveMQDestination dest = createDestination(remoteTarget);
+            ProducerContext producerContext = new ProducerContext(producerId, dest);
+
+            receiver.setContext(producerContext);
+            receiver.flow(1024 * 64);
+            ProducerInfo producerInfo = new ProducerInfo(producerId);
+            producerInfo.setDestination(dest);
+            sendToActiveMQ(producerInfo, new ResponseHandler() {
+                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                    receiver.open();
+                    if (response.isException()) {
+                        // If the connection attempt fails we close the socket.
+                        Throwable exception = ((ExceptionResponse) response).getException();
+                        receiver.close();
+                    }
+                    pumpProtonToSocket();
+                }
+            });
+        }
+
+
+    }
+
+    private ActiveMQDestination createDestination(Object terminus) {
+        if( terminus == null ) {
+            return null;
+        } else if( terminus instanceof org.apache.qpid.proton.type.messaging.Source) {
+            org.apache.qpid.proton.type.messaging.Source source = (org.apache.qpid.proton.type.messaging.Source)terminus;
+            return ActiveMQDestination.createDestination(source.getAddress(), ActiveMQDestination.QUEUE_TYPE);
+        } else if( terminus instanceof org.apache.qpid.proton.type.messaging.Target) {
+            org.apache.qpid.proton.type.messaging.Target target = (org.apache.qpid.proton.type.messaging.Target)terminus;
+            return ActiveMQDestination.createDestination(target.getAddress(), ActiveMQDestination.QUEUE_TYPE);
+        } else if( terminus instanceof Coordinator ) {
+            Coordinator target = (Coordinator)terminus;
+            return null;
+        } else {
+            throw new RuntimeException("Unexpected terminus type: "+terminus);
+        }
+    }
+
+    private Source createSource(ActiveMQDestination dest) {
+        org.apache.qpid.proton.type.messaging.Source rc = new org.apache.qpid.proton.type.messaging.Source();
+        rc.setAddress(inboundTransformer.getVendor().toAddress(dest));
+        return rc;
     }
 
     OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
 
+
     class ConsumerContext extends AmqpDeliveryListener {
         private final ConsumerId consumerId;
         private final Sender sender;
+        private boolean presettle;
+
+        public ConsumerContext(ConsumerId consumerId, Sender sender) {
+            this.consumerId = consumerId;
+            this.sender = sender;
+            this.presettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
+        }
 
         long nextTagId = 0;
         HashSet<byte[]> tagCache = new HashSet<byte[]>();
@@ -406,11 +619,13 @@ class AmqpProtocolConverter {
             return rc;
         }
 
-        public ConsumerContext(ConsumerId consumerId, Sender sender) {
-            this.consumerId = consumerId;
-            this.sender = sender;
+        void checkinTag(byte[] data) {
+            if( tagCache.size() < 1024 ) {
+                tagCache.add(data);
+            }
         }
 
+
         @Override
         public void onClose() throws Exception {
             sendToActiveMQ(new RemoveInfo(consumerId), null);
@@ -428,7 +643,7 @@ class AmqpProtocolConverter {
         Buffer currentBuffer;
         Delivery currentDelivery;
 
-        public void pumpOutbound() {
+        public void pumpOutbound() throws Exception {
             while(true) {
 
                 while( currentBuffer !=null ) {
@@ -436,8 +651,11 @@ class AmqpProtocolConverter {
                     if( sent > 0 ) {
                         currentBuffer.moveHead(sent);
                         if( currentBuffer.length == 0 ) {
-                            currentDelivery.settle();
-                            sender.advance();
+                            if( presettle ) {
+                                settle(currentDelivery, MessageAck.INDIVIDUAL_ACK_TYPE);
+                            } else {
+                                sender.advance();
+                            }
                             currentBuffer = null;
                             currentDelivery = null;
                         }
@@ -453,12 +671,17 @@ class AmqpProtocolConverter {
                 final MessageDispatch md = outbound.removeFirst();
                 try {
                     final ActiveMQMessage jms = (ActiveMQMessage) md.getMessage();
+                    jms.setRedeliveryCounter(md.getRedeliveryCounter());
                     final EncodedMessage amqp = outboundTransformer.transform(jms);
                     if( amqp!=null && amqp.getLength() > 0 ) {
 
                         currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
-                        final byte[] tag = nextTag();
-                        currentDelivery = sender.delivery(tag, 0, tag.length);
+                        if( presettle ) {
+                            currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0);
+                        } else {
+                            final byte[] tag = nextTag();
+                            currentDelivery = sender.delivery(tag, 0, tag.length);
+                        }
                         currentDelivery.setContext(md);
 
                     } else {
@@ -471,12 +694,81 @@ class AmqpProtocolConverter {
             }
         }
 
-        @Override
-        public void onDelivery(Delivery delivery) throws JMSException {
-            if( delivery.remotelySettled() ) {
+        private void settle(final Delivery delivery, int ackType) throws Exception {
+            byte[] tag = delivery.getTag();
+            if( tag !=null && tag.length>0 ) {
+                checkinTag(tag);
+            }
+
+            if( ackType == -1) {
+                // we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ
+                delivery.settle();
+                onMessageDispatch((MessageDispatch) delivery.getContext());
+            } else {
                 MessageDispatch md = (MessageDispatch) delivery.getContext();
-                pumpOutbound();
+                MessageAck ack = new MessageAck();
+                ack.setConsumerId(consumerId);
+                ack.setFirstMessageId(md.getMessage().getMessageId());
+                ack.setLastMessageId(md.getMessage().getMessageId());
+                ack.setMessageCount(1);
+                ack.setAckType((byte)ackType);
+
+                DeliveryState remoteState = delivery.getRemoteState();
+                if( remoteState!=null && remoteState instanceof TransactionalState) {
+                    TransactionalState s = (TransactionalState) remoteState;
+                    long txid = toLong(s.getTxnId());
+                    ack.setTransactionId(new LocalTransactionId(connectionId, txid));
+                }
+
+                sendToActiveMQ(ack, new ResponseHandler() {
+                    @Override
+                    public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                        delivery.settle();
+                        pumpProtonToSocket();
+                    }
+                });
+            }
+        }
+
+        @Override
+        public void drainCheck() {
+            if( outbound.isEmpty() ) {
+                sender.drained();
+            }
+        }
+
+        @Override
+        public void onDelivery(Delivery delivery) throws Exception {
+            MessageDispatch md = (MessageDispatch) delivery.getContext();
+            final DeliveryState state = delivery.getRemoteState();
+            if( state instanceof Accepted ) {
+                if( !delivery.remotelySettled() ) {
+                    delivery.disposition(new Accepted());
+                }
+                settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE);
+            } else if( state instanceof Rejected) {
+                // re-deliver /w incremented delivery counter.
+                md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
+                settle(delivery, -1);
+            } else if( state instanceof Released) {
+                // re-deliver && don't increment the counter.
+                settle(delivery, -1);
+            } else if( state instanceof Modified) {
+                Modified modified = (Modified) state;
+                if ( modified.getDeliveryFailed() ) {
+                  // increment delivery counter..
+                  md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
+                }
+                byte ackType = -1;
+                Boolean undeliverableHere = modified.getUndeliverableHere();
+                if( undeliverableHere !=null && undeliverableHere ) {
+                    // receiver does not want the message..
+                    // perhaps we should DLQ it?
+                    ackType = MessageAck.POSION_ACK_TYPE;
+                }
+                settle(delivery, ackType);
             }
+            pumpOutbound();
         }
 
     }
@@ -485,14 +777,16 @@ class AmqpProtocolConverter {
 
     void onSenderOpen(final Sender sender, AmqpSessionContext sessionContext) {
 
+        // sender.get
         ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
         ConsumerContext consumerContext = new ConsumerContext(id, sender);
 
         subscriptionsByConsumerId.put(id, consumerContext);
 
         ActiveMQDestination dest;
-        if( sender.getRemoteSourceAddress() != null ) {
-            dest = ActiveMQDestination.createDestination(sender.getRemoteSourceAddress(), ActiveMQDestination.QUEUE_TYPE);
+        final Source remoteSource = sender.getRemoteSource();
+        if( remoteSource != null ) {
+            dest = createDestination(remoteSource);
         } else {
             // lets create a temp dest.
 //            if (topic) {
@@ -507,7 +801,7 @@ class AmqpProtocolConverter {
             info.setDestination(dest);
             sendToActiveMQ(info, null);
             tempDestinations.put(sender, dest);
-            sender.setLocalSourceAddress(inboundTransformer.getVendor().toAddress(dest));
+            sender.setSource(createSource(dest));
         }
 
 

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java?rev=1400116&r1=1400115&r2=1400116&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java Fri Oct 19 14:28:02 2012
@@ -70,6 +70,20 @@ public class AmqpTransportFilter extends
         }
     }
 
+    @Override
+    public void onException(IOException error) {
+        try {
+            protocolConverter.lock.lock();
+            try {
+                protocolConverter.onAMQPException(error);
+            } finally {
+                protocolConverter.lock.unlock();
+            }
+        } finally {
+            super.onException(error);
+        }
+    }
+
     public void onCommand(Object command) {
         try {
             if (trace) {
@@ -140,6 +154,8 @@ public class AmqpTransportFilter extends
         return this.wireFormat;
     }
 
+
+
     public void handleException(IOException e) {
         super.onException(e);
     }

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java?rev=1400116&r1=1400115&r2=1400116&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java Fri Oct 19 14:28:02 2012
@@ -16,10 +16,15 @@
  */
 package org.apache.activemq.transport.amqp.transform;
 
+import org.apache.qpid.proton.codec.CompositeWritableBuffer;
+import org.apache.qpid.proton.codec.WritableBuffer;
+import org.apache.qpid.proton.type.UnsignedInteger;
+
 import javax.jms.BytesMessage;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageFormatException;
+import java.nio.ByteBuffer;
 
 /**
 * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -54,8 +59,43 @@ public class AMQPNativeOutboundTransform
             return null;
         }
         byte data[] = new byte[(int) msg.getBodyLength()];
+        int dataSize = data.length;
         msg.readBytes(data);
-        return new EncodedMessage(messageFormat, data, 0, data.length);
+        msg.reset();
+
+        try {
+            int count = msg.getIntProperty("JMSXDeliveryCount");
+            if( count > 1 ) {
+
+                // decode...
+                org.apache.qpid.proton.message.Message amqp = new org.apache.qpid.proton.message.Message();
+                int offset = 0;
+                int len = data.length;
+                while( len > 0 ) {
+                    final int decoded = amqp.decode(data, offset, len);
+                    assert decoded > 0: "Make progress decoding the message";
+                    offset += decoded;
+                    len -= decoded;
+                }
+
+                // Update the DeliveryCount header...
+                amqp.getHeader().setDeliveryCount(new UnsignedInteger(count));
+
+                // Re-encode...
+                ByteBuffer buffer = ByteBuffer.wrap(new byte[1024*4]);
+                final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
+                int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
+                if( overflow.position() > 0 ) {
+                    buffer = ByteBuffer.wrap(new byte[1024*4+overflow.position()]);
+                    c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
+                }
+                data = buffer.array();
+                dataSize = c;
+            }
+        } catch (JMSException e) {
+        }
+
+        return new EncodedMessage(messageFormat, data, 0, dataSize);
     }
 
 }

Modified: activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp%2Bssl?rev=1400116&r1=1400115&r2=1400116&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl (original)
+++ activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl Fri Oct 19 14:28:02 2012
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-class=org.apache.activemq.transport.amqp.AmqpSslTransportFactory
+class=org.apache.activemq.transport.amqp.AMQPSslTransportFactory

Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java?rev=1400116&r1=1400115&r2=1400116&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java Fri Oct 19 14:28:02 2012
@@ -19,12 +19,15 @@ package org.apache.activemq.transport.am
 import junit.framework.TestCase;
 import org.apache.activemq.AutoFailTestSupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.SslContext;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.spring.SpringSslContext;
 import org.junit.After;
 import org.junit.Before;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.util.Vector;
 
 import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
@@ -40,21 +43,51 @@ public class AmqpTestSupport {
     protected int numberOfMessages;
     AutoFailTestSupport autoFailTestSupport = new AutoFailTestSupport() {};
     protected int port;
+    protected int sslPort;
+
+
+    public static void main(String[] args) throws Exception {
+        final AmqpTestSupport s = new AmqpTestSupport();
+        s.sslPort = 5671;
+        s.port = 5672;
+        s.startBroker();
+        while(true) {
+            Thread.sleep(100000);
+        }
+    }
 
     @Before
-    public void startBroker() throws Exception {
+    public void setUp() throws Exception {
         autoFailTestSupport.startAutoFailThread();
         exceptions.clear();
+        startBroker();
+    }
+
+    public void startBroker() throws Exception {
         brokerService = new BrokerService();
         brokerService.setPersistent(false);
         brokerService.setAdvisorySupport(false);
+
+        // Setup SSL context...
+        final File classesDir = new File(AmqpProtocolConverter.class.getProtectionDomain().getCodeSource().getLocation().getFile());
+        File keystore = new File(classesDir, "../../src/test/resources/keystore");
+        final SpringSslContext sslContext = new SpringSslContext();
+        sslContext.setKeyStore(keystore.getCanonicalPath());
+        sslContext.setKeyStorePassword("password");
+        sslContext.setTrustStore(keystore.getCanonicalPath());
+        sslContext.setTrustStorePassword("password");
+        sslContext.afterPropertiesSet();
+        brokerService.setSslContext(sslContext);
+
         addAMQPConnector();
         brokerService.start();
         this.numberOfMessages = 2000;
     }
 
     protected void addAMQPConnector() throws Exception {
-        final TransportConnector connector = brokerService.addConnector("amqp://localhost:0");
+        TransportConnector connector =brokerService.addConnector("amqp+ssl://0.0.0.0:"+sslPort);
+        sslPort = connector.getConnectUri().getPort();
+        connector = brokerService.addConnector("amqp://0.0.0.0:"+port);
         port = connector.getConnectUri().getPort();
     }
 

Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java?rev=1400116&r1=1400115&r2=1400116&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java Fri Oct 19 14:28:02 2012
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.transport.amqp;
 
+import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
 import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
 import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
 import org.junit.Test;
@@ -30,59 +31,114 @@ import static org.junit.Assert.assertEqu
 public class JMSClientTest extends AmqpTestSupport {
 
     @Test
-    public void testSendReceive() throws Exception {
+    public void testTransactions() throws Exception {
+        ActiveMQAdmin.enableJMSFrameTracing();
+        QueueImpl queue = new QueueImpl("queue://txqueue");
+
+        Connection connection = createConnection();
+        {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer p = session.createProducer(queue);
+            p.send(session.createTextMessage("Hello World"));
+//            session.commit();
+
+            MessageConsumer c = session.createConsumer(queue);
+            Message msg = c.receive();
+            System.out.println("first:"+msg);
+            System.out.println(msg.getJMSRedelivered());
+
+//            session.rollback();
+//
+//            msg = c.receive();
+//            System.out.println("second:"+msg);
+//            System.out.println(msg.getJMSRedelivered());
+        }
+        connection.close();
 
-        QueueImpl queue = new QueueImpl("queue://testqueue");
-        int nMsgs = 100;
-        final String dataFormat = "%010240d";
+    }
 
-        final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, null, null);
+//    @Test
+//    public void testSendReceive() throws Exception {
+//        ActiveMQAdmin.enableJMSFrameTracing();
+//        QueueImpl queue = new QueueImpl("queue://testqueue");
+//        int nMsgs = 1;
+//        final String dataFormat = "%01024d";
+//
+//
+//        try {
+//            Connection connection = createConnection();
+//            {
+//                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//                MessageProducer p = session.createProducer(queue);
+//                for (int i = 0; i < nMsgs; i++) {
+//                    System.out.println("Sending " + i);
+//                    p.send(session.createTextMessage(String.format(dataFormat, i)));
+//                }
+//            }
+//            connection.close();
+//
+//            System.out.println("=======================================================================================");
+//            System.out.println(" failing a receive ");
+//            System.out.println("=======================================================================================");
+//            connection = createConnection();
+//            {
+//                Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+//                MessageConsumer c = session.createConsumer(queue);
+//
+//                // Receive messages non-transacted
+//                int i = 0;
+//                while ( i < 1) {
+//                    TextMessage msg = (TextMessage) c.receive();
+//                    if( msg!=null ) {
+//                        String s = msg.getText();
+//                        assertEquals(String.format(dataFormat, i), s);
+//                        System.out.println("Received: " + i);
+//                        i++;
+//                    }
+//                }
+//            }
+//            connection.close();
+//
+//
+//            System.out.println("=======================================================================================");
+//            System.out.println(" receiving ");
+//            System.out.println("=======================================================================================");
+//            connection = createConnection();
+//            {
+//                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//                MessageConsumer c = session.createConsumer(queue);
+//
+//                // Receive messages non-transacted
+//                int i = 0;
+//                while ( i < nMsgs) {
+//                    TextMessage msg = (TextMessage) c.receive();
+//                    if( msg!=null ) {
+//                        String s = msg.getText();
+//                        assertEquals(String.format(dataFormat, i), s);
+//                        System.out.println("Received: " + i);
+//                        i++;
+//                    }
+//                }
+//            }
+//            connection.close();
+//
+//        } catch (Exception e) {
+//            e.printStackTrace();
+//        }
+//
+//    }
 
-        try {
-            final Connection connection = factory.createConnection();
-            connection.setExceptionListener(new ExceptionListener() {
-                @Override
-                public void onException(JMSException exception) {
-                    exception.printStackTrace();
-                }
-            });
-            connection.start();
-            {
-                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                MessageProducer p = session.createProducer(queue);
-                for (int i = 0; i < nMsgs; i++) {
-                    System.out.println("Sending " + i);
-                    p.send(session.createTextMessage(String.format(dataFormat, i)));
-                }
-                p.close();
-                session.close();
-            }
-            System.out.println("=======================================================================================");
-            System.out.println(" receiving ");
-            System.out.println("=======================================================================================");
-            {
-                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                MessageConsumer c = session.createConsumer(queue);
-
-                // Receive messages non-transacted
-                int i = 0;
-                while ( i < nMsgs) {
-                    TextMessage msg = (TextMessage) c.receive();
-                    if( msg!=null ) {
-                        String s = msg.getText();
-                        assertEquals(String.format(dataFormat, i), s);
-                        System.out.println("Received: " + i);
-                        i++;
-                    }
-                }
-                c.close();
-                session.close();
+    private Connection createConnection() throws JMSException {
+        final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, null, null);
+        final Connection connection = factory.createConnection();
+        connection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+                exception.printStackTrace();
             }
-            connection.close();
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-
+        });
+        connection.start();
+        return connection;
     }
 
 }

Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/ActiveMQAdmin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/ActiveMQAdmin.java?rev=1400116&r1=1400115&r2=1400116&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/ActiveMQAdmin.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/ActiveMQAdmin.java Fri Oct 19 14:28:02 2012
@@ -29,6 +29,9 @@ import javax.naming.Context;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
 import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
 import java.net.URI;
 import java.util.Hashtable;
 import java.util.logging.*;
@@ -53,17 +56,18 @@ public class ActiveMQAdmin implements Ad
         }
     }
 
-    private void enableJMSFrameTracing() {
+    static public void enableJMSFrameTracing() throws FileNotFoundException {
         final SimpleFormatter formatter = new SimpleFormatter();
+        final PrintStream out = new PrintStream(new FileOutputStream(new File("/tmp/amqp-trace.txt")));
         Handler handler = new Handler() {
             @Override
             public void publish(LogRecord r) {
-                System.out.println(String.format("%s:%s", r.getLoggerName(), r.getMessage()));
+                out.println(String.format("%s:%s", r.getLoggerName(), r.getMessage()));
             }
 
             @Override
             public void flush() {
-                System.out.flush();
+                out.flush();
             }
 
             @Override
@@ -71,7 +75,7 @@ public class ActiveMQAdmin implements Ad
             }
         };
 
-        Logger log = Logger.getLogger("RAW");
+        Logger log = Logger.getLogger("FRM");
         log.addHandler(handler);
         log.setLevel(Level.FINEST);
     }

Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsTest.java?rev=1400116&r1=1400115&r2=1400116&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsTest.java Fri Oct 19 14:28:02 2012
@@ -47,6 +47,8 @@ public class JoramJmsTest extends TestCa
         TestSuite suite = new TestSuite();
 
         // Passing tests
+        suite.addTestSuite(ConnectionTest.class);
+        suite.addTestSuite(SessionTest.class);
         suite.addTestSuite(JMSXPropertyTest.class);
         suite.addTestSuite(MessageBodyTest.class);
         suite.addTestSuite(MessageDefaultTest.class);
@@ -54,13 +56,7 @@ public class JoramJmsTest extends TestCa
         suite.addTestSuite(MessagePropertyTest.class);
 
         if (false ) {
-// TODO: Fails due to JMS client impl error.
-        suite.addTestSuite(UnifiedSessionTest.class);
-// TODO: Fails due to https://issues.apache.org/jira/browse/PROTON-62: ClassCastException when processing an Attach frame
-        suite.addTestSuite(QueueSessionTest.class);
-        suite.addTestSuite(SessionTest.class);
-// TODO: Fails due to inconsistent ObjectMessage mapping in the JMS client.
-        suite.addTestSuite(MessageTypeTest.class);
+
 // TODO: Fails due to temp destinations not being supported yet.
         suite.addTestSuite(MessageHeaderTest.class);
         suite.addTestSuite(TemporaryQueueTest.class);
@@ -68,13 +64,17 @@ public class JoramJmsTest extends TestCa
 // TODO: Fails due to selectors not being implemented yet.
         suite.addTestSuite(SelectorSyntaxTest.class);
         suite.addTestSuite(SelectorTest.class);
+        suite.addTestSuite(QueueSessionTest.class);
+// TODO: Browsers not yet supported.
+        suite.addTestSuite(QueueBrowserTest.class);
+// TODO: Fails due to JMS client impl error.
+        suite.addTestSuite(UnifiedSessionTest.class);
+// TODO: Fails due to inconsistent ObjectMessage mapping in the JMS client.
+        suite.addTestSuite(MessageTypeTest.class);
 // TODO: Fails due to: javax.jms.IllegalStateException: Cannot set client-id to "publisherConnection"; client-id must be set on connection creation
         suite.addTestSuite(TopicConnectionTest.class);
         suite.addTestSuite(TopicSessionTest.class);
-// TODO: figure out why the following tests fail..
-// TODO: figure out why the following tests hang..
-        suite.addTestSuite(ConnectionTest.class);
-        suite.addTestSuite(QueueBrowserTest.class);
+
 
         }
         return suite;

Added: activemq/trunk/activemq-amqp/src/test/resources/keystore
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/resources/keystore?rev=1400116&view=auto
==============================================================================
Files activemq/trunk/activemq-amqp/src/test/resources/keystore (added) and activemq/trunk/activemq-amqp/src/test/resources/keystore Fri Oct 19 14:28:02 2012 differ