You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/05/30 20:07:54 UTC

svn commit: r1487950 - in /activemq/trunk/activemq-amqp/src: main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java

Author: tabish
Date: Thu May 30 18:07:54 2013
New Revision: 1487950

URL: http://svn.apache.org/r1487950
Log:
https://issues.apache.org/jira/browse/AMQ-4563

Updated the test case some more, shows that its a problem with the MessageId handling when we receive Messages from the AMQP JMS client, when we use our own JMS client to send the the IDs are correct and we can properly ack them

Modified:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1487950&r1=1487949&r2=1487950&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Thu May 30 18:07:54 2013
@@ -109,7 +109,7 @@ class AmqpProtocolConverter {
     public static final EnumSet<EndpointState> CLOSED_STATE = EnumSet.of(EndpointState.CLOSED);
     public static final EnumSet<EndpointState> ALL_STATES = EnumSet.of(EndpointState.CLOSED, EndpointState.ACTIVE, EndpointState.UNINITIALIZED);
     private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class);
-    static final public byte[] EMPTY_BYTE_ARRAY = new byte[]{};
+    static final public byte[] EMPTY_BYTE_ARRAY = new byte[] {};
     private final AmqpTransport amqpTransport;
     private static final Symbol COPY = Symbol.getSymbol("copy");
     private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector");
@@ -126,7 +126,7 @@ class AmqpProtocolConverter {
     public AmqpProtocolConverter(AmqpTransport transport, BrokerContext brokerContext) {
         this.amqpTransport = transport;
         this.protonTransport.bind(this.protonConnection);
-        if( transport.isTrace() ) {
+        if (transport.isTrace()) {
             this.protonTransport.setProtocolTracer(new ProtocolTracer() {
                 @Override
                 public void receivedFrame(TransportFrame transportFrame) {
@@ -151,13 +151,13 @@ class AmqpProtocolConverter {
                 if (count > 0) {
                     final Buffer buffer;
                     buffer = new Buffer(data, 0, count);
-//                    System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 "));
+                    // System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 "));
                     amqpTransport.sendToAmqp(buffer);
                 } else {
                     done = true;
                 }
             }
-//            System.out.println("write done");
+            // System.out.println("write done");
         } catch (IOException e) {
             amqpTransport.onException(e);
         }
@@ -170,7 +170,6 @@ class AmqpProtocolConverter {
 
         public AmqpSessionContext(ConnectionId connectionId, long id) {
             sessionId = new SessionId(connectionId, id);
-
         }
     }
 
@@ -181,29 +180,29 @@ class AmqpProtocolConverter {
      */
     public void onAMQPData(Object command) throws Exception {
         Buffer frame;
-        if( command.getClass() == AmqpHeader.class ) {
-            AmqpHeader header = (AmqpHeader)command;
-            switch( header.getProtocolId() ) {
+        if (command.getClass() == AmqpHeader.class) {
+            AmqpHeader header = (AmqpHeader) command;
+            switch (header.getProtocolId()) {
                 case 0:
                     // amqpTransport.sendToAmqp(new AmqpHeader());
                     break; // nothing to do..
                 case 3: // Client will be using SASL for auth..
                     sasl = protonTransport.sasl();
-                    sasl.setMechanisms(new String[]{"ANONYMOUS", "PLAIN"});
+                    sasl.setMechanisms(new String[] { "ANONYMOUS", "PLAIN" });
                     sasl.server();
                     break;
                 default:
             }
             frame = header.getBuffer();
         } else {
-            frame = (Buffer)command;
+            frame = (Buffer) command;
         }
         onFrame(frame);
     }
 
     public void onFrame(Buffer frame) throws Exception {
-//        System.out.println("read: " + frame.toString().substring(5).replaceAll("(..)", "$1 "));
-        while( frame.length > 0 ) {
+        // System.out.println("read: " + frame.toString().substring(5).replaceAll("(..)", "$1 "));
+        while (frame.length > 0) {
             try {
                 int count = protonTransport.input(frame.data, frame.offset, frame.length);
                 frame.moveHead(count);
@@ -211,26 +210,26 @@ class AmqpProtocolConverter {
                 handleException(new AmqpProtocolException("Could not decode AMQP frame: " + frame, true, e));
                 return;
             }
-            try {
 
-                if( sasl!=null ) {
+            try {
+                if (sasl != null) {
                     // Lets try to complete the sasl handshake.
-                    if( sasl.getRemoteMechanisms().length > 0 ) {
-                        if( "PLAIN".equals(sasl.getRemoteMechanisms()[0]) ) {
+                    if (sasl.getRemoteMechanisms().length > 0) {
+                        if ("PLAIN".equals(sasl.getRemoteMechanisms()[0])) {
                             byte[] data = new byte[sasl.pending()];
                             sasl.recv(data, 0, data.length);
                             Buffer[] parts = new Buffer(data).split((byte) 0);
-                            if( parts.length > 0 ) {
+                            if (parts.length > 0) {
                                 connectionInfo.setUserName(parts[0].utf8().toString());
                             }
-                            if( parts.length > 1 ) {
+                            if (parts.length > 1) {
                                 connectionInfo.setPassword(parts[1].utf8().toString());
                             }
                             // We can't really auth at this point since we don't know the client id yet.. :(
                             sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
                             amqpTransport.getWireFormat().magicRead = false;
                             sasl = null;
-                        } else if( "ANONYMOUS".equals(sasl.getRemoteMechanisms()[0]) ) {
+                        } else if ("ANONYMOUS".equals(sasl.getRemoteMechanisms()[0])) {
                             sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
                             amqpTransport.getWireFormat().magicRead = false;
                             sasl = null;
@@ -246,12 +245,10 @@ class AmqpProtocolConverter {
                 // Lets map amqp sessions to openwire sessions..
                 Session session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
                 while (session != null) {
-
                     onSessionOpen(session);
                     session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
                 }
 
-
                 Link link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
                 while (link != null) {
                     onLinkOpen(link);
@@ -269,21 +266,20 @@ class AmqpProtocolConverter {
 
                 link = protonConnection.linkHead(ACTIVE_STATE, CLOSED_STATE);
                 while (link != null) {
-                    ((AmqpDeliveryListener)link.getContext()).onClose();
+                    ((AmqpDeliveryListener) link.getContext()).onClose();
                     link.close();
                     link = link.next(ACTIVE_STATE, CLOSED_STATE);
                 }
 
                 link = protonConnection.linkHead(ACTIVE_STATE, ALL_STATES);
                 while (link != null) {
-                    ((AmqpDeliveryListener)link.getContext()).drainCheck();
+                    ((AmqpDeliveryListener) link.getContext()).drainCheck();
                     link = link.next(ACTIVE_STATE, ALL_STATES);
                 }
 
-
                 session = protonConnection.sessionHead(ACTIVE_STATE, CLOSED_STATE);
                 while (session != null) {
-                    //TODO - close links?
+                    // TODO - close links?
                     onSessionClose(session);
                     session = session.next(ACTIVE_STATE, CLOSED_STATE);
                 }
@@ -303,13 +299,13 @@ class AmqpProtocolConverter {
     boolean closedSocket = false;
 
     private void doClose() {
-        if( !closing ) {
+        if (!closing) {
             closing = true;
             sendToActiveMQ(new RemoveInfo(connectionId), new ResponseHandler() {
                 @Override
                 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
                     protonConnection.close();
-                    if( !closedSocket) {
+                    if (!closedSocket) {
                         pumpProtonToSocket();
                     }
                 }
@@ -317,10 +313,9 @@ class AmqpProtocolConverter {
         }
     }
 
-
     public void onAMQPException(IOException error) {
         closedSocket = true;
-        if( !closing ) {
+        if (!closing) {
             amqpTransport.sendToActiveMQ(error);
         } else {
             try {
@@ -347,6 +342,9 @@ class AmqpProtocolConverter {
             MessageDispatch md = (MessageDispatch) command;
             ConsumerContext consumerContext = subscriptionsByConsumerId.get(md.getConsumerId());
             if (consumerContext != null) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Dispatching MessageId:{} to consumer", md.getMessage().getMessageId());
+                }
                 consumerContext.onMessageDispatch(md);
             }
         } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
@@ -354,7 +352,7 @@ class AmqpProtocolConverter {
             Throwable exception = ((ConnectionError) command).getException();
             handleException(exception);
         } else if (command.isBrokerInfo()) {
-            //ignore
+            // ignore
         } else {
             LOG.debug("Do not know how to process ActiveMQ Command " + command);
         }
@@ -368,15 +366,19 @@ class AmqpProtocolConverter {
 
     static abstract class AmqpDeliveryListener {
         abstract public void onDelivery(Delivery delivery) throws Exception;
-        public void onClose() throws Exception {}
-        public void drainCheck() {}
+
+        public void onClose() throws Exception {
+        }
+
+        public void drainCheck() {
+        }
     }
 
     private void onConnectionOpen() throws AmqpProtocolException {
 
         connectionInfo.setResponseRequired(true);
         connectionInfo.setConnectionId(connectionId);
-//        configureInactivityMonitor(connect.keepAlive());
+        // configureInactivityMonitor(connect.keepAlive());
 
         String clientId = protonConnection.getRemoteContainer();
         if (clientId != null && !clientId.isEmpty()) {
@@ -393,8 +395,9 @@ class AmqpProtocolConverter {
 
                 if (response.isException()) {
                     Throwable exception = ((ExceptionResponse) response).getException();
-// TODO: figure out how to close /w an error.
-//                    protonConnection.setLocalError(new EndpointError(exception.getClass().getName(), exception.getMessage()));
+                    // TODO: figure out how to close /w an error.
+                    // protonConnection.setLocalError(new EndpointError(exception.getClass().getName(),
+                    // exception.getMessage()));
                     protonConnection.close();
                     pumpProtonToSocket();
                     amqpTransport.onException(IOExceptionSupport.create(exception));
@@ -413,8 +416,8 @@ class AmqpProtocolConverter {
     }
 
     private void onSessionClose(Session session) {
-        AmqpSessionContext sessionContext = (AmqpSessionContext)session.getContext();
-        if( sessionContext!=null ) {
+        AmqpSessionContext sessionContext = (AmqpSessionContext) session.getContext();
+        if (sessionContext != null) {
             System.out.println(sessionContext.sessionId);
             sendToActiveMQ(new RemoveInfo(sessionContext.sessionId), null);
             session.setContext(null);
@@ -436,7 +439,7 @@ class AmqpProtocolConverter {
 
     InboundTransformer inboundTransformer;
 
-    protected InboundTransformer getInboundTransformer()  {
+    protected InboundTransformer getInboundTransformer() {
         if (inboundTransformer == null) {
             String transformer = amqpTransport.getTransformer();
             if (transformer.equals(InboundTransformer.TRANSFORMER_JMS)) {
@@ -459,24 +462,24 @@ class AmqpProtocolConverter {
 
         @Override
         public void onDelivery(Delivery delivery) throws Exception {
-            Receiver receiver = ((Receiver)delivery.getLink());
-            if( !delivery.isReadable() ) {
+            Receiver receiver = ((Receiver) delivery.getLink());
+            if (!delivery.isReadable()) {
                 System.out.println("it was not readable!");
                 return;
             }
 
-            if( current==null ) {
+            if (current == null) {
                 current = new ByteArrayOutputStream();
             }
 
             int count;
-            byte data[] = new byte[1024*4];
-            while( (count = receiver.recv(data, 0, data.length)) > 0 ) {
+            byte data[] = new byte[1024 * 4];
+            while ((count = receiver.recv(data, 0, data.length)) > 0) {
                 current.write(data, 0, count);
             }
 
             // Expecting more deliveries..
-            if( count == 0 ) {
+            if (count == 0) {
                 return;
             }
 
@@ -505,16 +508,16 @@ class AmqpProtocolConverter {
             final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em);
             current = null;
 
-            if( destination!=null ) {
+            if (destination != null) {
                 message.setJMSDestination(destination);
             }
             message.setProducerId(producerId);
-            if( message.getMessageId()==null ) {
+            if (message.getMessageId() == null) {
                 message.setMessageId(new MessageId(producerId, messageIdGenerator.getNextSequenceId()));
             }
 
             DeliveryState remoteState = delivery.getRemoteState();
-            if( remoteState!=null && remoteState instanceof TransactionalState) {
+            if (remoteState != null && remoteState instanceof TransactionalState) {
                 TransactionalState s = (TransactionalState) remoteState;
                 long txid = toLong(s.getTxnId());
                 message.setTransactionId(new LocalTransactionId(connectionId, txid));
@@ -524,9 +527,9 @@ class AmqpProtocolConverter {
             sendToActiveMQ(message, new ResponseHandler() {
                 @Override
                 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
-                    if( !delivery.remotelySettled()  ) {
-                        if( response.isException() ) {
-                            ExceptionResponse er = (ExceptionResponse)response;
+                    if (!delivery.remotelySettled()) {
+                        if (response.isException()) {
+                            ExceptionResponse er = (ExceptionResponse) response;
                             Rejected rejected = new Rejected();
                             ErrorCondition condition = new ErrorCondition();
                             condition.setCondition(Symbol.valueOf("failed"));
@@ -541,13 +544,13 @@ class AmqpProtocolConverter {
                 }
             });
         }
-
     }
 
     long nextTransactionId = 0;
-    class Transaction {
 
+    class Transaction {
     }
+
     HashMap<Long, Transaction> transactions = new HashMap<Long, Transaction>();
 
     public byte[] toBytes(long value) {
@@ -561,7 +564,6 @@ class AmqpProtocolConverter {
         return buffer.bigEndianEditor().readLong();
     }
 
-
     AmqpDeliveryListener coordinatorContext = new BaseProducerContext() {
         @Override
         protected void onMessage(Receiver receiver, final Delivery delivery, Buffer buffer) throws Exception {
@@ -569,49 +571,48 @@ class AmqpProtocolConverter {
             MessageImpl msg = new MessageImpl();
             int offset = buffer.offset;
             int len = buffer.length;
-            while( len > 0 ) {
+            while (len > 0) {
                 final int decoded = msg.decode(buffer.data, offset, len);
-                assert decoded > 0: "Make progress decoding the message";
+                assert decoded > 0 : "Make progress decoding the message";
                 offset += decoded;
                 len -= decoded;
             }
 
-            Object action = ((AmqpValue)msg.getBody()).getValue();
-            System.out.println("COORDINATOR received: "+action+", ["+buffer+"]");
-            if( action instanceof Declare ) {
+            Object action = ((AmqpValue) msg.getBody()).getValue();
+            System.out.println("COORDINATOR received: " + action + ", [" + buffer + "]");
+            if (action instanceof Declare) {
                 Declare declare = (Declare) action;
-                if( declare.getGlobalId()!=null ) {
+                if (declare.getGlobalId() != null) {
                     throw new Exception("don't know how to handle a declare /w a set GlobalId");
                 }
 
                 long txid = nextTransactionId++;
                 TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), TransactionInfo.BEGIN);
                 sendToActiveMQ(txinfo, null);
-                System.out.println("started transaction "+txid);
+                System.out.println("started transaction " + txid);
 
                 Declared declared = new Declared();
                 declared.setTxnId(new Binary(toBytes(txid)));
                 delivery.disposition(declared);
                 delivery.settle();
-
-            } else if( action instanceof Discharge) {
+            } else if (action instanceof Discharge) {
                 Discharge discharge = (Discharge) action;
                 long txid = toLong(discharge.getTxnId());
 
                 byte operation;
-                if( discharge.getFail() ) {
-                    System.out.println("rollback transaction "+txid);
-                    operation = TransactionInfo.ROLLBACK ;
+                if (discharge.getFail()) {
+                    System.out.println("rollback transaction " + txid);
+                    operation = TransactionInfo.ROLLBACK;
                 } else {
-                    System.out.println("commit transaction "+txid);
+                    System.out.println("commit transaction " + txid);
                     operation = TransactionInfo.COMMIT_ONE_PHASE;
                 }
                 TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), operation);
                 sendToActiveMQ(txinfo, new ResponseHandler() {
                     @Override
                     public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
-                        if( response.isException() ) {
-                            ExceptionResponse er = (ExceptionResponse)response;
+                        if (response.isException()) {
+                            ExceptionResponse er = (ExceptionResponse) response;
                             Rejected rejected = new Rejected();
                             rejected.setError(createErrorCondition("failed", er.getException().getMessage()));
                             delivery.disposition(rejected);
@@ -620,20 +621,17 @@ class AmqpProtocolConverter {
                         pumpProtonToSocket();
                     }
                 });
-
             } else {
-                throw new Exception("Expected coordinator message type: "+action.getClass());
+                throw new Exception("Expected coordinator message type: " + action.getClass());
             }
         }
-
     };
 
-
     void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) {
         // Client is producing to this receiver object
         org.apache.qpid.proton.amqp.transport.Target remoteTarget = receiver.getRemoteTarget();
         try {
-            if( remoteTarget instanceof Coordinator ) {
+            if (remoteTarget instanceof Coordinator) {
                 pumpProtonToSocket();
                 receiver.setContext(coordinatorContext);
                 receiver.flow(prefetch);
@@ -643,7 +641,7 @@ class AmqpProtocolConverter {
                 Target target = (Target) remoteTarget;
                 ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
                 ActiveMQDestination dest;
-                if( target.getDynamic() ) {
+                if (target.getDynamic()) {
                     dest = createTempQueue();
                     Target actualTarget = new Target();
                     actualTarget.setAddress(dest.getQualifiedName());
@@ -665,7 +663,7 @@ class AmqpProtocolConverter {
                         if (response.isException()) {
                             receiver.setTarget(null);
                             Throwable exception = ((ExceptionResponse) response).getException();
-                            ((LinkImpl)receiver).setLocalError(new EndpointError(exception.getClass().getName(), exception.getMessage()));
+                            ((LinkImpl) receiver).setLocalError(new EndpointError(exception.getClass().getName(), exception.getMessage()));
                             receiver.close();
                         } else {
                             receiver.open();
@@ -676,37 +674,35 @@ class AmqpProtocolConverter {
             }
         } catch (AmqpProtocolException exception) {
             receiver.setTarget(null);
-            ((LinkImpl)receiver).setLocalError(new EndpointError(exception.getSymbolicName(), exception.getMessage()));
+            ((LinkImpl) receiver).setLocalError(new EndpointError(exception.getSymbolicName(), exception.getMessage()));
             receiver.close();
         }
     }
 
     private ActiveMQDestination createDestination(Object terminus) throws AmqpProtocolException {
-        if( terminus == null ) {
+        if (terminus == null) {
             return null;
-        } else if( terminus instanceof org.apache.qpid.proton.amqp.messaging.Source) {
-            org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source)terminus;
-            if( source.getAddress() == null || source.getAddress().length()==0) {
+        } else if (terminus instanceof org.apache.qpid.proton.amqp.messaging.Source) {
+            org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) terminus;
+            if (source.getAddress() == null || source.getAddress().length() == 0) {
                 throw new AmqpProtocolException("amqp:invalid-field", "source address not set");
             }
             return ActiveMQDestination.createDestination(source.getAddress(), ActiveMQDestination.QUEUE_TYPE);
-        } else if( terminus instanceof org.apache.qpid.proton.amqp.messaging.Target) {
-            org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target)terminus;
-            if( target.getAddress() == null || target.getAddress().length()==0) {
+        } else if (terminus instanceof org.apache.qpid.proton.amqp.messaging.Target) {
+            org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) terminus;
+            if (target.getAddress() == null || target.getAddress().length() == 0) {
                 throw new AmqpProtocolException("amqp:invalid-field", "target address not set");
             }
             return ActiveMQDestination.createDestination(target.getAddress(), ActiveMQDestination.QUEUE_TYPE);
-        } else if( terminus instanceof Coordinator ) {
-            Coordinator target = (Coordinator)terminus;
+        } else if (terminus instanceof Coordinator) {
             return null;
         } else {
-            throw new RuntimeException("Unexpected terminus type: "+terminus);
+            throw new RuntimeException("Unexpected terminus type: " + terminus);
         }
     }
 
     OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
 
-
     class ConsumerContext extends AmqpDeliveryListener {
         private final ConsumerId consumerId;
         private final Sender sender;
@@ -739,15 +735,14 @@ class AmqpProtocolConverter {
         }
 
         void checkinTag(byte[] data) {
-            if( tagCache.size() < 1024 ) {
+            if (tagCache.size() < 1024) {
                 tagCache.add(data);
             }
         }
 
-
         @Override
         public void onClose() throws Exception {
-            if( !closed ) {
+            if (!closed) {
                 closed = true;
                 sendToActiveMQ(new RemoveInfo(consumerId), null);
             }
@@ -757,7 +752,7 @@ class AmqpProtocolConverter {
 
         // called when the connection receives a JMS message from ActiveMQ
         public void onMessageDispatch(MessageDispatch md) throws Exception {
-            if( !closed ) {
+            if (!closed) {
                 outbound.addLast(md);
                 pumpOutbound();
                 pumpProtonToSocket();
@@ -768,14 +763,14 @@ class AmqpProtocolConverter {
         Delivery currentDelivery;
 
         public void pumpOutbound() throws Exception {
-            while(!closed) {
+            while (!closed) {
 
-                while( currentBuffer !=null ) {
+                while (currentBuffer != null) {
                     int sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
-                    if( sent > 0 ) {
+                    if (sent > 0) {
                         currentBuffer.moveHead(sent);
-                        if( currentBuffer.length == 0 ) {
-                            if( presettle ) {
+                        if (currentBuffer.length == 0) {
+                            if (presettle) {
                                 settle(currentDelivery, MessageAck.INDIVIDUAL_ACK_TYPE);
                             } else {
                                 sender.advance();
@@ -788,34 +783,31 @@ class AmqpProtocolConverter {
                     }
                 }
 
-                if( outbound.isEmpty() ) {
+                if (outbound.isEmpty()) {
                     return;
                 }
 
                 final MessageDispatch md = outbound.removeFirst();
                 try {
                     final ActiveMQMessage jms = (ActiveMQMessage) md.getMessage();
-                    if( jms==null ) {
+                    if (jms == null) {
                         // It's the end of browse signal.
                         sender.drained();
                     } else {
                         jms.setRedeliveryCounter(md.getRedeliveryCounter());
                         jms.setReadOnlyBody(true);
                         final EncodedMessage amqp = outboundTransformer.transform(jms);
-                        if( amqp!=null && amqp.getLength() > 0 ) {
-
+                        if (amqp != null && amqp.getLength() > 0) {
                             currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
-                            if( presettle ) {
+                            if (presettle) {
                                 currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0);
                             } else {
                                 final byte[] tag = nextTag();
                                 currentDelivery = sender.delivery(tag, 0, tag.length);
                             }
                             currentDelivery.setContext(md);
-
                         } else {
                             // TODO: message could not be generated what now?
-
                         }
                     }
                 } catch (Exception e) {
@@ -826,11 +818,11 @@ class AmqpProtocolConverter {
 
         private void settle(final Delivery delivery, int ackType) throws Exception {
             byte[] tag = delivery.getTag();
-            if( tag !=null && tag.length>0 ) {
+            if (tag != null && tag.length > 0) {
                 checkinTag(tag);
             }
 
-            if( ackType == -1) {
+            if (ackType == -1) {
                 // we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ
                 delivery.settle();
                 onMessageDispatch((MessageDispatch) delivery.getContext());
@@ -841,20 +833,24 @@ class AmqpProtocolConverter {
                 ack.setFirstMessageId(md.getMessage().getMessageId());
                 ack.setLastMessageId(md.getMessage().getMessageId());
                 ack.setMessageCount(1);
-                ack.setAckType((byte)ackType);
+                ack.setAckType((byte) ackType);
                 ack.setDestination(md.getDestination());
 
                 DeliveryState remoteState = delivery.getRemoteState();
-                if( remoteState!=null && remoteState instanceof TransactionalState) {
+                if (remoteState != null && remoteState instanceof TransactionalState) {
                     TransactionalState s = (TransactionalState) remoteState;
                     long txid = toLong(s.getTxnId());
                     ack.setTransactionId(new LocalTransactionId(connectionId, txid));
                 }
 
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Sending Ack for MessageId:{} to ActiveMQ", ack.getLastMessageId());
+                }
+
                 sendToActiveMQ(ack, new ResponseHandler() {
                     @Override
                     public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
-                        if( response.isException() ) {
+                        if (response.isException()) {
                             if (response.isException()) {
                                 Throwable exception = ((ExceptionResponse) response).getException();
                                 exception.printStackTrace();
@@ -871,7 +867,7 @@ class AmqpProtocolConverter {
 
         @Override
         public void drainCheck() {
-            if( outbound.isEmpty() ) {
+            if (outbound.isEmpty()) {
                 sender.drained();
             }
         }
@@ -880,27 +876,27 @@ class AmqpProtocolConverter {
         public void onDelivery(Delivery delivery) throws Exception {
             MessageDispatch md = (MessageDispatch) delivery.getContext();
             final DeliveryState state = delivery.getRemoteState();
-            if( state instanceof Accepted ) {
-                if( !delivery.remotelySettled() ) {
+            if (state instanceof Accepted) {
+                if (!delivery.remotelySettled()) {
                     delivery.disposition(new Accepted());
                 }
                 settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE);
-            } else if( state instanceof Rejected) {
+            } else if (state instanceof Rejected) {
                 // re-deliver /w incremented delivery counter.
                 md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
                 settle(delivery, -1);
-            } else if( state instanceof Released) {
+            } else if (state instanceof Released) {
                 // re-deliver && don't increment the counter.
                 settle(delivery, -1);
-            } else if( state instanceof Modified) {
+            } else if (state instanceof Modified) {
                 Modified modified = (Modified) state;
-                if ( modified.getDeliveryFailed() ) {
-                  // increment delivery counter..
-                  md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
+                if (modified.getDeliveryFailed()) {
+                    // increment delivery counter..
+                    md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
                 }
                 byte ackType = -1;
                 Boolean undeliverableHere = modified.getUndeliverableHere();
-                if( undeliverableHere !=null && undeliverableHere ) {
+                if (undeliverableHere != null && undeliverableHere) {
                     // receiver does not want the message..
                     // perhaps we should DLQ it?
                     ackType = MessageAck.POSION_ACK_TYPE;
@@ -909,13 +905,12 @@ class AmqpProtocolConverter {
             }
             pumpOutbound();
         }
-
     }
 
     private final ConcurrentHashMap<ConsumerId, ConsumerContext> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, ConsumerContext>();
 
     void onSenderOpen(final Sender sender, AmqpSessionContext sessionContext) {
-        org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source)sender.getRemoteSource();
+        org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) sender.getRemoteSource();
 
         try {
             final ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
@@ -923,18 +918,18 @@ class AmqpProtocolConverter {
             sender.setContext(consumerContext);
 
             String selector = null;
-            if( source!=null ) {
+            if (source != null) {
                 Map filter = source.getFilter();
                 if (filter != null) {
-                    DescribedType value = (DescribedType)filter.get(JMS_SELECTOR);
-                    if( value!=null ) {
+                    DescribedType value = (DescribedType) filter.get(JMS_SELECTOR);
+                    if (value != null) {
                         selector = value.getDescribed().toString();
                         // Validate the Selector.
                         try {
                             SelectorParser.parse(selector);
                         } catch (InvalidSelectorException e) {
                             sender.setSource(null);
-                            ((LinkImpl)sender).setLocalError(new EndpointError("amqp:invalid-field", e.getMessage()));
+                            ((LinkImpl) sender).setLocalError(new EndpointError("amqp:invalid-field", e.getMessage()));
                             sender.close();
                             consumerContext.closed = true;
                             return;
@@ -944,7 +939,7 @@ class AmqpProtocolConverter {
             }
 
             ActiveMQDestination dest;
-            if( source == null ) {
+            if (source == null) {
 
                 source = new org.apache.qpid.proton.amqp.messaging.Source();
                 source.setAddress("");
@@ -957,7 +952,7 @@ class AmqpProtocolConverter {
                 rsi.setSubscriptionName(sender.getName());
                 rsi.setClientId(connectionInfo.getClientId());
 
-                consumerContext.closed=true;
+                consumerContext.closed = true;
                 sendToActiveMQ(rsi, new ResponseHandler() {
                     @Override
                     public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
@@ -965,19 +960,19 @@ class AmqpProtocolConverter {
                             sender.setSource(null);
                             Throwable exception = ((ExceptionResponse) response).getException();
                             String name = exception.getClass().getName();
-                            ((LinkImpl)sender).setLocalError(new EndpointError(name, exception.getMessage()));
+                            ((LinkImpl) sender).setLocalError(new EndpointError(name, exception.getMessage()));
                         }
                         sender.open();
                         pumpProtonToSocket();
                     }
                 });
                 return;
-            } else if( contains(source.getCapabilities(), DURABLE_SUBSCRIPTION_ENDED) ) {
-                consumerContext.closed=true;
+            } else if (contains(source.getCapabilities(), DURABLE_SUBSCRIPTION_ENDED)) {
+                consumerContext.closed = true;
                 sender.close();
                 pumpProtonToSocket();
                 return;
-            } else if( source.getDynamic() ) {
+            } else if (source.getDynamic()) {
                 // lets create a temp dest.
                 dest = createTempQueue();
                 source = new org.apache.qpid.proton.amqp.messaging.Source();
@@ -995,17 +990,17 @@ class AmqpProtocolConverter {
             consumerInfo.setDestination(dest);
             consumerInfo.setPrefetchSize(100);
             consumerInfo.setDispatchAsync(true);
-            if( source.getDistributionMode() == COPY && dest.isQueue() ) {
+            if (source.getDistributionMode() == COPY && dest.isQueue()) {
                 consumerInfo.setBrowser(true);
             }
-            if( DURABLE.equals(source.getDurable()) && dest.isTopic() ) {
+            if (DURABLE.equals(source.getDurable()) && dest.isTopic()) {
                 consumerInfo.setSubscriptionName(sender.getName());
             }
 
             Map filter = source.getFilter();
             if (filter != null) {
-                DescribedType value = (DescribedType)filter.get(NO_LOCAL);
-                if( value!=null ) {
+                DescribedType value = (DescribedType) filter.get(NO_LOCAL);
+                if (value != null) {
                     consumerInfo.setNoLocal(true);
                 }
             }
@@ -1017,10 +1012,10 @@ class AmqpProtocolConverter {
                         sender.setSource(null);
                         Throwable exception = ((ExceptionResponse) response).getException();
                         String name = exception.getClass().getName();
-                        if( exception instanceof InvalidSelectorException ) {
+                        if (exception instanceof InvalidSelectorException) {
                             name = "amqp:invalid-field";
                         }
-                        ((LinkImpl)sender).setLocalError(new EndpointError(name, exception.getMessage()));
+                        ((LinkImpl) sender).setLocalError(new EndpointError(name, exception.getMessage()));
                         subscriptionsByConsumerId.remove(id);
                         sender.close();
                     } else {
@@ -1031,15 +1026,15 @@ class AmqpProtocolConverter {
             });
         } catch (AmqpProtocolException e) {
             sender.setSource(null);
-            ((LinkImpl)sender).setLocalError(new EndpointError(e.getSymbolicName(), e.getMessage()));
+            ((LinkImpl) sender).setLocalError(new EndpointError(e.getSymbolicName(), e.getMessage()));
             sender.close();
         }
     }
 
     static private boolean contains(Symbol[] haystack, Symbol needle) {
-        if( haystack!=null ) {
+        if (haystack != null) {
             for (Symbol capability : haystack) {
-                if( capability == needle) {
+                if (capability == needle) {
                     return true;
                 }
             }
@@ -1058,11 +1053,11 @@ class AmqpProtocolConverter {
         return rc;
     }
 
-    ////////////////////////////////////////////////////////////////////////////
+    // //////////////////////////////////////////////////////////////////////////
     //
     // Implementation methods
     //
-    ////////////////////////////////////////////////////////////////////////////
+    // //////////////////////////////////////////////////////////////////////////
 
     private final Object commnadIdMutex = new Object();
     private int lastCommandId;
@@ -1106,5 +1101,4 @@ class AmqpProtocolConverter {
         condition.setDescription(description);
         return condition;
     }
-
 }

Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java?rev=1487950&r1=1487949&r2=1487950&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java Thu May 30 18:07:54 2013
@@ -23,6 +23,8 @@ import static org.junit.Assert.assertTru
 import java.io.File;
 
 import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -31,6 +33,7 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.spring.SpringSslContext;
 import org.apache.activemq.store.kahadb.KahaDBStore;
@@ -43,14 +46,47 @@ public class AMQ4563Test extends AmqpTes
 
     public static final String KAHADB_DIRECTORY = "target/activemq-data/kahadb-amq4563";
 
+    private String openwireUri;
+
+    @Test(timeout = 60000)
+    public void testMessagesAreAckedAMQProducer() throws Exception {
+        int messagesSent = 3;
+        ActiveMQAdmin.enableJMSFrameTracing();
+        QueueImpl queue = new QueueImpl("queue://txqueue");
+        assertTrue(brokerService.isPersistent());
+
+        Connection connection = createAMQConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue("txqueue");
+        MessageProducer p = session.createProducer(destination);
+        TextMessage message = null;
+        for (int i=0; i < messagesSent; i++) {
+            message = session.createTextMessage();
+            String messageText = "Hello " + i + " sent at " + new java.util.Date().toString();
+            message.setText(messageText);
+            LOG.debug(">>>> Sent [" + messageText + "]");
+            p.send(message);
+        }
+
+        // After the first restart we should get all messages sent above
+        restartBroker(connection, session);
+        int messagesReceived = readAllMessages(queue);
+        assertEquals(messagesSent, messagesReceived);
+
+        // This time there should be no messages on this queue
+        restartBroker(connection, session);
+        messagesReceived = readAllMessages(queue);
+        assertEquals(0, messagesReceived);
+    }
+
     @Test(timeout = 60000)
-    public void testTransactions() throws Exception {
+    public void testMessagesAreAckedAMQPProducer() throws Exception {
         int messagesSent = 3;
         ActiveMQAdmin.enableJMSFrameTracing();
         QueueImpl queue = new QueueImpl("queue://txqueue");
         assertTrue(brokerService.isPersistent());
 
-        Connection connection = createConnection();
+        Connection connection = createAMQPConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageProducer p = session.createProducer(queue);
         TextMessage message = null;
@@ -74,7 +110,7 @@ public class AMQ4563Test extends AmqpTes
     }
 
     private int readAllMessages(QueueImpl queue) throws JMSException {
-        Connection connection = createConnection();
+        Connection connection = createAMQPConnection();
         try {
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             int messagesReceived = 0;
@@ -104,7 +140,7 @@ public class AMQ4563Test extends AmqpTes
         createBroker(false);
     }
 
-    private Connection createConnection() throws JMSException {
+    private Connection createAMQPConnection() throws JMSException {
         LOG.debug(">>> In createConnection using port " + port);
         final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
         final Connection connection = factory.createConnection();
@@ -118,6 +154,20 @@ public class AMQ4563Test extends AmqpTes
         return connection;
     }
 
+    private Connection createAMQConnection() throws JMSException {
+        LOG.debug(">>> In createConnection using port " + port);
+        final ConnectionFactory factory = new ActiveMQConnectionFactory("admin", "password", openwireUri);
+        final Connection connection = factory.createConnection();
+        connection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+                exception.printStackTrace();
+            }
+        });
+        connection.start();
+        return connection;
+    }
+
     @Override
     public void startBroker() throws Exception {
         createBroker(true);
@@ -136,6 +186,7 @@ public class AMQ4563Test extends AmqpTes
         brokerService.setPersistenceAdapter(kaha);
         brokerService.setAdvisorySupport(false);
         brokerService.setUseJmx(false);
+        openwireUri = brokerService.addConnector("tcp://0.0.0.0:0").getPublishableConnectString();
 
         // Setup SSL context...
         final File classesDir = new File(AmqpProtocolConverter.class.getProtectionDomain().getCodeSource().getLocation().getFile());