You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/12/01 20:16:26 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5467

Repository: activemq
Updated Branches:
  refs/heads/trunk 9797d3b95 -> 9edf907ae


https://issues.apache.org/jira/browse/AMQ-5467

Apply patch to use individual ack for messages in a TX to avoid
unmatched ack when ack range is non-sequential

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9edf907a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9edf907a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9edf907a

Branch: refs/heads/trunk
Commit: 9edf907aedb8c0337577235467061aade237d72d
Parents: 9797d3b
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Dec 1 14:16:01 2014 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Dec 1 14:16:01 2014 -0500

----------------------------------------------------------------------
 .../transport/amqp/AmqpProtocolConverter.java   | 36 ++++++-------
 .../transport/amqp/AmqpTestSupport.java         |  5 +-
 .../activemq/transport/amqp/JMSClientTest.java  | 55 ++++++++++++++++++--
 3 files changed, 73 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/9edf907a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index a08e08b..9a252f2 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -1203,29 +1203,29 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         @Override
         void doCommit() throws Exception {
             if (!dispatchedInTx.isEmpty()) {
+                for (MessageDispatch md : dispatchedInTx) {
+                    MessageAck pendingTxAck = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
+                    pendingTxAck.setFirstMessageId(md.getMessage().getMessageId());
+                    pendingTxAck.setTransactionId(md.getMessage().getTransactionId());
 
-                MessageDispatch md = dispatchedInTx.getFirst();
-                MessageAck pendingTxAck = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, dispatchedInTx.size());
-                pendingTxAck.setTransactionId(md.getMessage().getTransactionId());
-                pendingTxAck.setFirstMessageId(dispatchedInTx.getLast().getMessage().getMessageId());
-
-                LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck);
-
-                dispatchedInTx.clear();
+                    LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck);
 
-                sendToActiveMQ(pendingTxAck, new ResponseHandler() {
-                    @Override
-                    public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
-                        if (response.isException()) {
+                    sendToActiveMQ(pendingTxAck, new ResponseHandler() {
+                        @Override
+                        public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
                             if (response.isException()) {
-                                Throwable exception = ((ExceptionResponse) response).getException();
-                                exception.printStackTrace();
-                                sender.close();
+                                if (response.isException()) {
+                                    Throwable exception = ((ExceptionResponse) response).getException();
+                                    exception.printStackTrace();
+                                    sender.close();
+                                }
                             }
+                            pumpProtonToSocket();
                         }
-                        pumpProtonToSocket();
-                    }
-                });
+                    });
+                }
+
+                dispatchedInTx.clear();
             }
         }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/9edf907a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
index 8150bc0..2f2f5af 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
@@ -54,6 +54,8 @@ import org.slf4j.LoggerFactory;
 
 public class AmqpTestSupport {
 
+    public static final String MESSAGE_NUMBER = "MessageNumber";
+
     @Rule public TestName name = new TestName();
 
     protected static final Logger LOG = LoggerFactory.getLogger(AmqpTestSupport.class);
@@ -249,9 +251,10 @@ public class AmqpTestSupport {
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageProducer p = session.createProducer(destination);
 
-        for (int i = 0; i < count; i++) {
+        for (int i = 1; i <= count; i++) {
             TextMessage message = session.createTextMessage();
             message.setText("TextMessage: " + i);
+            message.setIntProperty(MESSAGE_NUMBER, i);
             p.send(message);
         }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/9edf907a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index 0380a87..ffa7b24 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -18,6 +18,9 @@ package org.apache.activemq.transport.amqp;
 
 import java.util.ArrayList;
 import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -140,7 +143,7 @@ public class JMSClientTest extends JMSClientTestSupport {
         final int msgCount = 1;
 
         connection = createConnection();
-        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
         Queue queue = session.createQueue(getDestinationName());
         sendMessages(connection, queue, msgCount);
 
@@ -170,7 +173,7 @@ public class JMSClientTest extends JMSClientTestSupport {
         final int msgCount = 1;
 
         connection = createConnection();
-        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
         Queue queue = session.createQueue(getDestinationName());
         sendMessages(connection, queue, msgCount);
 
@@ -206,6 +209,50 @@ public class JMSClientTest extends JMSClientTestSupport {
         session.close();
     }
 
+    @Test(timeout = 60000)
+    public void testRollbackSomeThenReceiveAndCommit() throws Exception {
+        int totalCount = 5;
+        int consumeBeforeRollback = 2;
+
+        connection = createConnection();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Queue queue = session.createQueue(getDestinationName());
+        sendMessages(connection, queue, totalCount);
+
+        QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(totalCount, proxy.getQueueSize());
+
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        for(int i = 1; i <= consumeBeforeRollback; i++) {
+            Message message = consumer.receive(1000);
+            assertNotNull(message);
+            assertEquals("Unexpected message number", i, message.getIntProperty(AmqpTestSupport.MESSAGE_NUMBER));
+        }
+
+        session.rollback();
+
+        assertEquals(totalCount, proxy.getQueueSize());
+
+        // Consume again..check we receive all the messages.
+        Set<Integer> messageNumbers = new HashSet<Integer>();
+        for(int i = 1; i <= totalCount; i++) {
+            messageNumbers.add(i);
+        }
+
+        for(int i = 1; i <= totalCount; i++) {
+            Message message = consumer.receive(1000);
+            assertNotNull(message);
+            int msgNum = message.getIntProperty(AmqpTestSupport.MESSAGE_NUMBER);
+            messageNumbers.remove(msgNum);
+        }
+
+        session.commit();
+
+        assertTrue("Did not consume all expected messages, missing messages: " + messageNumbers, messageNumbers.isEmpty());
+        assertEquals("Queue should have no messages left after commit", 0, proxy.getQueueSize());
+    }
+
     @Test(timeout=60000)
     public void testTXConsumerAndLargeNumberOfMessages() throws Exception {
 
@@ -213,7 +260,7 @@ public class JMSClientTest extends JMSClientTestSupport {
         final int msgCount = 500;
 
         connection = createConnection();
-        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
         Queue queue = session.createQueue(getDestinationName());
         sendMessages(connection, queue, msgCount);
 
@@ -757,7 +804,7 @@ public class JMSClientTest extends JMSClientTestSupport {
         ActiveMQAdmin.enableJMSFrameTracing();
 
         connection = createConnection();
-        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
         Queue queue = session.createQueue(getDestinationName());
 
         connection.start();