You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/08/01 00:05:28 UTC

svn commit: r1509046 - in /activemq/trunk/activemq-amqp/src: main/java/org/apache/activemq/transport/amqp/ test/java/org/apache/activemq/transport/amqp/

Author: tabish
Date: Wed Jul 31 22:05:27 2013
New Revision: 1509046

URL: http://svn.apache.org/r1509046
Log:
fix and tests for: https://issues.apache.org/jira/browse/AMQ-4651

Modified:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1509046&r1=1509045&r2=1509046&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Wed Jul 31 22:05:27 2013
@@ -102,6 +102,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class AmqpProtocolConverter {
+
     static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES;
     public static final EnumSet<EndpointState> UNINITIALIZED_SET = EnumSet.of(EndpointState.UNINITIALIZED);
     public static final EnumSet<EndpointState> INITIALIZED_SET = EnumSet.complementOf(UNINITIALIZED_SET);
@@ -151,7 +152,6 @@ class AmqpProtocolConverter {
         }
     }
 
-
     void pumpProtonToSocket() {
         try {
             int size = 1024 * 64;
@@ -179,6 +179,8 @@ class AmqpProtocolConverter {
         long nextProducerId = 0;
         long nextConsumerId = 0;
 
+        final LinkedList<ConsumerContext> consumers = new LinkedList<ConsumerContext>();
+
         public AmqpSessionContext(ConnectionId connectionId, long id) {
             sessionId = new SessionId(connectionId, id);
         }
@@ -368,7 +370,9 @@ class AmqpProtocolConverter {
         } else if (command.isBrokerInfo()) {
             // ignore
         } else {
-            LOG.debug("Do not know how to process ActiveMQ Command " + command);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Do not know how to process ActiveMQ Command " + command);
+            }
         }
     }
 
@@ -379,13 +383,16 @@ class AmqpProtocolConverter {
     private long nextTempDestinationId = 0;
 
     static abstract class AmqpDeliveryListener {
+
         abstract public void onDelivery(Delivery delivery) throws Exception;
 
-        public void onClose() throws Exception {
-        }
+        public void onClose() throws Exception {}
 
-        public void drainCheck() {
-        }
+        public void drainCheck() {}
+
+        abstract void doCommit() throws Exception;
+
+        abstract void doRollback() throws Exception;
     }
 
     private void onConnectionOpen() throws AmqpProtocolException {
@@ -505,6 +512,12 @@ class AmqpProtocolConverter {
             onMessage(receiver, delivery, buffer);
         }
 
+        @Override
+        void doCommit() throws Exception {}
+
+        @Override
+        void doRollback() throws Exception {}
+
         abstract protected void onMessage(Receiver receiver, Delivery delivery, Buffer buffer) throws Exception;
     }
 
@@ -573,7 +586,7 @@ class AmqpProtocolConverter {
         }
     }
 
-    long nextTransactionId = 0;
+    long nextTransactionId = 1;
 
     class Transaction {
     }
@@ -592,6 +605,7 @@ class AmqpProtocolConverter {
     }
 
     AmqpDeliveryListener coordinatorContext = new BaseProducerContext() {
+
         @Override
         protected void onMessage(Receiver receiver, final Delivery delivery, Buffer buffer) throws Exception {
 
@@ -605,7 +619,7 @@ class AmqpProtocolConverter {
                 len -= decoded;
             }
 
-            Object action = ((AmqpValue) msg.getBody()).getValue();
+            final Object action = ((AmqpValue) msg.getBody()).getValue();
             LOG.debug("COORDINATOR received: " + action + ", [" + buffer + "]");
             if (action instanceof Declare) {
                 Declare declare = (Declare) action;
@@ -628,7 +642,7 @@ class AmqpProtocolConverter {
                 Discharge discharge = (Discharge) action;
                 long txid = toLong(discharge.getTxnId());
 
-                byte operation;
+                final byte operation;
                 if (discharge.getFail()) {
                     if (LOG.isTraceEnabled()) {
                         LOG.trace("rollback transaction " + txid);
@@ -640,6 +654,16 @@ class AmqpProtocolConverter {
                     }
                     operation = TransactionInfo.COMMIT_ONE_PHASE;
                 }
+
+                AmqpSessionContext context = (AmqpSessionContext) receiver.getSession().getContext();
+                for (ConsumerContext consumer : context.consumers) {
+                    if (operation == TransactionInfo.ROLLBACK) {
+                        consumer.doRollback();
+                    } else {
+                        consumer.doCommit();
+                    }
+                }
+
                 TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), operation);
                 sendToActiveMQ(txinfo, new ResponseHandler() {
                     @Override
@@ -650,10 +674,20 @@ class AmqpProtocolConverter {
                             rejected.setError(createErrorCondition("failed", er.getException().getMessage()));
                             delivery.disposition(rejected);
                         }
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("TX: {} settling {}", operation, action);
+                        }
                         delivery.settle();
                         pumpProtonToSocket();
                     }
                 });
+
+                for (ConsumerContext consumer : context.consumers) {
+                    if (operation == TransactionInfo.ROLLBACK) {
+                        consumer.pumpOutbound();
+                    }
+                }
+
             } else {
                 throw new Exception("Expected coordinator message type: " + action.getClass());
             }
@@ -744,6 +778,7 @@ class AmqpProtocolConverter {
         public ConsumerInfo info;
         private boolean endOfBrowse = false;
 
+        protected LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>();
 
         public ConsumerContext(ConsumerId consumerId, Sender sender) {
             this.consumerId = consumerId;
@@ -789,7 +824,10 @@ class AmqpProtocolConverter {
         // called when the connection receives a JMS message from ActiveMQ
         public void onMessageDispatch(MessageDispatch md) throws Exception {
             if (!closed) {
-                outbound.addLast(md);
+                // Lock to prevent stepping on TX redelivery
+                synchronized (outbound) {
+                    outbound.addLast(md);
+                }
                 pumpOutbound();
                 pumpProtonToSocket();
             }
@@ -853,7 +891,7 @@ class AmqpProtocolConverter {
             }
         }
 
-        private void settle(final Delivery delivery, int ackType) throws Exception {
+        private void settle(final Delivery delivery, final int ackType) throws Exception {
             byte[] tag = delivery.getTag();
             if (tag != null && tag.length > 0) {
                 checkinTag(tag);
@@ -877,11 +915,16 @@ class AmqpProtocolConverter {
                 if (remoteState != null && remoteState instanceof TransactionalState) {
                     TransactionalState s = (TransactionalState) remoteState;
                     long txid = toLong(s.getTxnId());
-                    ack.setTransactionId(new LocalTransactionId(connectionId, txid));
+                    LocalTransactionId localTxId = new LocalTransactionId(connectionId, txid);
+                    ack.setTransactionId(localTxId);
+
+                    // Store the message sent in this TX we might need to re-send on rollback
+                    md.getMessage().setTransactionId(localTxId);
+                    dispatchedInTx.addFirst(md);
                 }
 
                 if (LOG.isTraceEnabled()) {
-                    LOG.trace("Sending Ack for MessageId:{} to ActiveMQ", ack.getLastMessageId());
+                    LOG.trace("Sending Ack to ActiveMQ: {}", ack);
                 }
 
                 sendToActiveMQ(ack, new ResponseHandler() {
@@ -917,46 +960,129 @@ class AmqpProtocolConverter {
         @Override
         public void onDelivery(Delivery delivery) throws Exception {
             MessageDispatch md = (MessageDispatch) delivery.getContext();
-            final DeliveryState state = delivery.getRemoteState();
-            if (state instanceof Accepted) {
-                if (!delivery.remotelySettled()) {
-                    delivery.disposition(new Accepted());
-                }
-                settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE);
-            } else if (state instanceof Rejected) {
-                // re-deliver /w incremented delivery counter.
-                md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
-                settle(delivery, -1);
-            } else if (state instanceof Released) {
-                // re-deliver && don't increment the counter.
-                settle(delivery, -1);
-            } else if (state instanceof Modified) {
-                Modified modified = (Modified) state;
-                if (modified.getDeliveryFailed()) {
-                    // increment delivery counter..
-                    md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
+            DeliveryState state = delivery.getRemoteState();
+
+            if (state instanceof TransactionalState) {
+                TransactionalState txState = (TransactionalState) state;
+                if (txState.getOutcome() instanceof DeliveryState) {
+
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("onDelivery: TX delivery state = {}", state);
+                    }
+
+                    state = (DeliveryState) txState.getOutcome();
+
+                    if (state instanceof Accepted) {
+                        if (!delivery.remotelySettled()) {
+                            delivery.disposition(new Accepted());
+                        }
+                        settle(delivery, MessageAck.DELIVERED_ACK_TYPE);
+                    }
                 }
-                byte ackType = -1;
-                Boolean undeliverableHere = modified.getUndeliverableHere();
-                if (undeliverableHere != null && undeliverableHere) {
-                    // receiver does not want the message..
-                    // perhaps we should DLQ it?
-                    ackType = MessageAck.POSION_ACK_TYPE;
+            } else {
+                if (state instanceof Accepted) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("onDelivery: accepted state = {}", state);
+                    }
+
+                    if (!delivery.remotelySettled()) {
+                        delivery.disposition(new Accepted());
+                    }
+                    settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE);
+                } else if (state instanceof Rejected) {
+                    // re-deliver /w incremented delivery counter.
+                    md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("onDelivery: Rejected state = {}, delivery count now {}", state, md.getRedeliveryCounter());
+                    }
+                    settle(delivery, -1);
+                } else if (state instanceof Released) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("onDelivery: Released state = {}", state);
+                    }
+                    // re-deliver && don't increment the counter.
+                    settle(delivery, -1);
+                } else if (state instanceof Modified) {
+                    Modified modified = (Modified) state;
+                    if (modified.getDeliveryFailed()) {
+                        // increment delivery counter..
+                        md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
+                    }
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("onDelivery: Modified state = {}, delivery count now {}", state, md.getRedeliveryCounter());
+                    }
+                    byte ackType = -1;
+                    Boolean undeliverableHere = modified.getUndeliverableHere();
+                    if (undeliverableHere != null && undeliverableHere) {
+                        // receiver does not want the message..
+                        // perhaps we should DLQ it?
+                        ackType = MessageAck.POSION_ACK_TYPE;
+                    }
+                    settle(delivery, ackType);
                 }
-                settle(delivery, ackType);
             }
             pumpOutbound();
         }
+
+        @Override
+        void doCommit() throws Exception {
+            if (!dispatchedInTx.isEmpty()) {
+
+                MessageDispatch md = dispatchedInTx.getFirst();
+                MessageAck pendingTxAck = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, dispatchedInTx.size());
+                pendingTxAck.setTransactionId(md.getMessage().getTransactionId());
+                pendingTxAck.setFirstMessageId(dispatchedInTx.getLast().getMessage().getMessageId());
+
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck);
+                }
+
+                dispatchedInTx.clear();
+
+                sendToActiveMQ(pendingTxAck, new ResponseHandler() {
+                    @Override
+                    public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                        if (response.isException()) {
+                            if (response.isException()) {
+                                Throwable exception = ((ExceptionResponse) response).getException();
+                                exception.printStackTrace();
+                                sender.close();
+                            }
+                        }
+                        pumpProtonToSocket();
+                    }
+                });
+            }
+        }
+
+        @Override
+        void doRollback() throws Exception {
+            synchronized (outbound) {
+
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Rolling back {} messages for redelivery. ", dispatchedInTx.size());
+                }
+
+                for (MessageDispatch md : dispatchedInTx) {
+                    md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
+                    md.getMessage().setTransactionId(null);
+                    outbound.addFirst(md);
+                }
+
+                dispatchedInTx.clear();
+            }
+        }
     }
 
     private final ConcurrentHashMap<ConsumerId, ConsumerContext> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, ConsumerContext>();
 
-    void onSenderOpen(final Sender sender, AmqpSessionContext sessionContext) {
+    @SuppressWarnings("rawtypes")
+    void onSenderOpen(final Sender sender, final AmqpSessionContext sessionContext) {
         org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) sender.getRemoteSource();
 
         try {
             final ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
-            ConsumerContext consumerContext = new ConsumerContext(id, sender);
+            final ConsumerContext consumerContext = new ConsumerContext(id, sender);
             sender.setContext(consumerContext);
 
             String selector = null;
@@ -1062,6 +1188,7 @@ class AmqpProtocolConverter {
                         subscriptionsByConsumerId.remove(id);
                         sender.close();
                     } else {
+                        sessionContext.consumers.add(consumerContext);
                         sender.open();
                     }
                     pumpProtonToSocket();

Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java?rev=1509046&r1=1509045&r2=1509046&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java Wed Jul 31 22:05:27 2013
@@ -114,7 +114,7 @@ public class AmqpTestSupport {
             p.send(message);
         }
 
-        p.close();
+        session.close();
     }
 
     protected BrokerViewMBean getProxyToBroker() throws MalformedObjectNameException, JMSException {

Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java?rev=1509046&r1=1509045&r2=1509046&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java Wed Jul 31 22:05:27 2013
@@ -37,7 +37,9 @@ import org.apache.activemq.broker.jmx.Qu
 import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
 import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
 import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
 import org.objectweb.jtests.jms.framework.TestConfig;
 
 /**
@@ -45,11 +47,13 @@ import org.objectweb.jtests.jms.framewor
  */
 public class JMSClientTest extends AmqpTestSupport {
 
+    @Rule public TestName name = new TestName();
+
     @SuppressWarnings("rawtypes")
     @Test
     public void testProducerConsume() throws Exception {
         ActiveMQAdmin.enableJMSFrameTracing();
-        QueueImpl queue = new QueueImpl("queue://txqueue");
+        QueueImpl queue = new QueueImpl("queue://" + name);
 
         Connection connection = createConnection();
         {
@@ -78,32 +82,29 @@ public class JMSClientTest extends AmqpT
     @Test
     public void testTransactedConsumer() throws Exception {
         ActiveMQAdmin.enableJMSFrameTracing();
-        QueueImpl queue = new QueueImpl("queue://txqueue");
-        final int msgCount = 10;
+        QueueImpl queue = new QueueImpl("queue://" + name);
+        final int msgCount = 1;
 
         Connection connection = createConnection();
         sendMessages(connection, queue, msgCount);
 
-        QueueViewMBean queueView = getProxyToQueue("txqueue");
+        QueueViewMBean queueView = getProxyToQueue(name.toString());
         LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
         assertEquals(msgCount, queueView.getQueueSize());
 
-        // Consumer all in TX and commit.
-        {
-            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-            MessageConsumer consumer = session.createConsumer(queue);
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(queue);
 
-            for (int i = 0; i < msgCount; ++i) {
-                Message msg = consumer.receive(TestConfig.TIMEOUT);
-                assertNotNull(msg);
-                assertTrue(msg instanceof TextMessage);
-            }
+        Message msg = consumer.receive(TestConfig.TIMEOUT);
+        assertNotNull(msg);
+        assertTrue(msg instanceof TextMessage);
 
-            consumer.close();
-            session.commit();
-        }
+        LOG.info("Queue size before session commit is: {}", queueView.getQueueSize());
+        assertEquals(msgCount, queueView.getQueueSize());
+
+        session.commit();
 
-        LOG.info("Queue size after consumer commit is: {}", queueView.getQueueSize());
+        LOG.info("Queue size after session commit is: {}", queueView.getQueueSize());
         assertEquals(0, queueView.getQueueSize());
 
         connection.close();
@@ -113,13 +114,13 @@ public class JMSClientTest extends AmqpT
     public void testRollbackRececeivedMessage() throws Exception {
 
         ActiveMQAdmin.enableJMSFrameTracing();
-        QueueImpl queue = new QueueImpl("queue://txqueue");
+        QueueImpl queue = new QueueImpl("queue://" + name);
         final int msgCount = 1;
 
         Connection connection = createConnection();
         sendMessages(connection, queue, msgCount);
 
-        QueueViewMBean queueView = getProxyToQueue("txqueue");
+        QueueViewMBean queueView = getProxyToQueue(name.toString());
         LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
         assertEquals(msgCount, queueView.getQueueSize());
 
@@ -128,6 +129,7 @@ public class JMSClientTest extends AmqpT
 
         // Receive and roll back, first receive should not show redelivered.
         Message msg = consumer.receive(TestConfig.TIMEOUT);
+        LOG.info("Test received msg: {}", msg);
         assertNotNull(msg);
         assertTrue(msg instanceof TextMessage);
         assertEquals(false, msg.getJMSRedelivered());
@@ -147,19 +149,22 @@ public class JMSClientTest extends AmqpT
 
         LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
         assertEquals(0, queueView.getQueueSize());
+
+        session.close();
+        connection.close();
     }
 
     @Test
     public void testTXConsumerAndLargeNumberOfMessages() throws Exception {
 
         ActiveMQAdmin.enableJMSFrameTracing();
-        QueueImpl queue = new QueueImpl("queue://txqueue");
+        QueueImpl queue = new QueueImpl("queue://" + name);
         final int msgCount = 500;
 
         Connection connection = createConnection();
         sendMessages(connection, queue, msgCount);
 
-        QueueViewMBean queueView = getProxyToQueue("txqueue");
+        QueueViewMBean queueView = getProxyToQueue(name.toString());
         LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
         assertEquals(msgCount, queueView.getQueueSize());
 
@@ -177,10 +182,13 @@ public class JMSClientTest extends AmqpT
                 assertTrue(msg instanceof TextMessage);
             }
 
-            consumer.close();
             session.commit();
+            consumer.close();
+            session.close();
         }
 
+        connection.close();
+
         LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
         assertEquals(0, queueView.getQueueSize());
     }
@@ -189,7 +197,7 @@ public class JMSClientTest extends AmqpT
     @Test
     public void testSelectors() throws Exception{
         ActiveMQAdmin.enableJMSFrameTracing();
-        QueueImpl queue = new QueueImpl("queue://txqueue");
+        QueueImpl queue = new QueueImpl("queue://" + name);
 
         Connection connection = createConnection();
         {