You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/12/01 20:16:26 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5467
Repository: activemq
Updated Branches:
refs/heads/trunk 9797d3b95 -> 9edf907ae
https://issues.apache.org/jira/browse/AMQ-5467
Apply patch to use individual ack for messages in a TX to avoid
unmatched ack when ack range is non-sequential
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9edf907a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9edf907a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9edf907a
Branch: refs/heads/trunk
Commit: 9edf907aedb8c0337577235467061aade237d72d
Parents: 9797d3b
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Dec 1 14:16:01 2014 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Dec 1 14:16:01 2014 -0500
----------------------------------------------------------------------
.../transport/amqp/AmqpProtocolConverter.java | 36 ++++++-------
.../transport/amqp/AmqpTestSupport.java | 5 +-
.../activemq/transport/amqp/JMSClientTest.java | 55 ++++++++++++++++++--
3 files changed, 73 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/9edf907a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index a08e08b..9a252f2 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -1203,29 +1203,29 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
@Override
void doCommit() throws Exception {
if (!dispatchedInTx.isEmpty()) {
+ for (MessageDispatch md : dispatchedInTx) {
+ MessageAck pendingTxAck = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
+ pendingTxAck.setFirstMessageId(md.getMessage().getMessageId());
+ pendingTxAck.setTransactionId(md.getMessage().getTransactionId());
- MessageDispatch md = dispatchedInTx.getFirst();
- MessageAck pendingTxAck = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, dispatchedInTx.size());
- pendingTxAck.setTransactionId(md.getMessage().getTransactionId());
- pendingTxAck.setFirstMessageId(dispatchedInTx.getLast().getMessage().getMessageId());
-
- LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck);
-
- dispatchedInTx.clear();
+ LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck);
- sendToActiveMQ(pendingTxAck, new ResponseHandler() {
- @Override
- public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
- if (response.isException()) {
+ sendToActiveMQ(pendingTxAck, new ResponseHandler() {
+ @Override
+ public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
- Throwable exception = ((ExceptionResponse) response).getException();
- exception.printStackTrace();
- sender.close();
+ if (response.isException()) {
+ Throwable exception = ((ExceptionResponse) response).getException();
+ exception.printStackTrace();
+ sender.close();
+ }
}
+ pumpProtonToSocket();
}
- pumpProtonToSocket();
- }
- });
+ });
+ }
+
+ dispatchedInTx.clear();
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/9edf907a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
index 8150bc0..2f2f5af 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
@@ -54,6 +54,8 @@ import org.slf4j.LoggerFactory;
public class AmqpTestSupport {
+ public static final String MESSAGE_NUMBER = "MessageNumber";
+
@Rule public TestName name = new TestName();
protected static final Logger LOG = LoggerFactory.getLogger(AmqpTestSupport.class);
@@ -249,9 +251,10 @@ public class AmqpTestSupport {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer p = session.createProducer(destination);
- for (int i = 0; i < count; i++) {
+ for (int i = 1; i <= count; i++) {
TextMessage message = session.createTextMessage();
message.setText("TextMessage: " + i);
+ message.setIntProperty(MESSAGE_NUMBER, i);
p.send(message);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/9edf907a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index 0380a87..ffa7b24 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -18,6 +18,9 @@ package org.apache.activemq.transport.amqp;
import java.util.ArrayList;
import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -140,7 +143,7 @@ public class JMSClientTest extends JMSClientTestSupport {
final int msgCount = 1;
connection = createConnection();
- Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getDestinationName());
sendMessages(connection, queue, msgCount);
@@ -170,7 +173,7 @@ public class JMSClientTest extends JMSClientTestSupport {
final int msgCount = 1;
connection = createConnection();
- Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getDestinationName());
sendMessages(connection, queue, msgCount);
@@ -206,6 +209,50 @@ public class JMSClientTest extends JMSClientTestSupport {
session.close();
}
+ @Test(timeout = 60000)
+ public void testRollbackSomeThenReceiveAndCommit() throws Exception {
+ int totalCount = 5;
+ int consumeBeforeRollback = 2;
+
+ connection = createConnection();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(getDestinationName());
+ sendMessages(connection, queue, totalCount);
+
+ QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(totalCount, proxy.getQueueSize());
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ for(int i = 1; i <= consumeBeforeRollback; i++) {
+ Message message = consumer.receive(1000);
+ assertNotNull(message);
+ assertEquals("Unexpected message number", i, message.getIntProperty(AmqpTestSupport.MESSAGE_NUMBER));
+ }
+
+ session.rollback();
+
+ assertEquals(totalCount, proxy.getQueueSize());
+
+ // Consume again..check we receive all the messages.
+ Set<Integer> messageNumbers = new HashSet<Integer>();
+ for(int i = 1; i <= totalCount; i++) {
+ messageNumbers.add(i);
+ }
+
+ for(int i = 1; i <= totalCount; i++) {
+ Message message = consumer.receive(1000);
+ assertNotNull(message);
+ int msgNum = message.getIntProperty(AmqpTestSupport.MESSAGE_NUMBER);
+ messageNumbers.remove(msgNum);
+ }
+
+ session.commit();
+
+ assertTrue("Did not consume all expected messages, missing messages: " + messageNumbers, messageNumbers.isEmpty());
+ assertEquals("Queue should have no messages left after commit", 0, proxy.getQueueSize());
+ }
+
@Test(timeout=60000)
public void testTXConsumerAndLargeNumberOfMessages() throws Exception {
@@ -213,7 +260,7 @@ public class JMSClientTest extends JMSClientTestSupport {
final int msgCount = 500;
connection = createConnection();
- Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getDestinationName());
sendMessages(connection, queue, msgCount);
@@ -757,7 +804,7 @@ public class JMSClientTest extends JMSClientTestSupport {
ActiveMQAdmin.enableJMSFrameTracing();
connection = createConnection();
- Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getDestinationName());
connection.start();