You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2017/06/02 18:22:14 UTC
[1/3] activemq git commit: AMQ-6697 Adds a test to show that the
described case works
Repository: activemq
Updated Branches:
refs/heads/activemq-5.14.x 908ef6472 -> e38ac94a2
AMQ-6697 Adds a test to show that the described case works
Correctly ACK inside a TX and then Abort and then ACK again outside a TX
to show that the broker will then mark the message as consumed.
(cherry picked from commit bd8661796b190ef605458cdd7a0d90d9af4f51a0)
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/1c141eae
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/1c141eae
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/1c141eae
Branch: refs/heads/activemq-5.14.x
Commit: 1c141eae409f87f5a2d8ffe1a5d8821882a614a0
Parents: 908ef64
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Jun 2 11:50:14 2017 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Jun 2 14:21:43 2017 -0400
----------------------------------------------------------------------
.../activemq/transport/stomp/Stomp11Test.java | 59 ++++++++++++++++++++
1 file changed, 59 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/1c141eae/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
index a707cf2..f61c899 100644
--- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
+++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
@@ -35,6 +35,7 @@ import javax.jms.TextMessage;
import javax.management.ObjectName;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.util.Wait;
@@ -1128,4 +1129,62 @@ public class Stomp11Test extends StompTestSupport {
assertEquals(view.getDurableTopicSubscribers().length, 2);
assertEquals(view.getInactiveDurableTopicSubscribers().length, 0);
}
+
+ @Test(timeout = 60000)
+ public void testTransactionRollbackAllowsSecondAckOutsideTX() throws Exception {
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello"));
+ producer.close();
+
+ String frame = "STOMP\n" + "login:system\n" + "passcode:manager\n" +
+ "accept-version:1.1\n" + "host:localhost\n" + "client-id:test\n" + "\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ String f = stompConnection.receiveFrame();
+ assertTrue(f.startsWith("CONNECTED"));
+
+ QueueViewMBean queueView = getProxyToQueue(getQueueName());
+ assertEquals(1, queueView.getQueueSize());
+
+ frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+ "id:12345\n" + "ack:client\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ StompFrame received = stompConnection.receive();
+ assertTrue(received.getAction().equals("MESSAGE"));
+
+ // ack it in the TX then abort
+ frame = "ACK\n" + "transaction: tx1\n" + "subscription:12345\n" + "message-id:" +
+ received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ // rollback first message
+ frame = "ABORT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ assertEquals(1, queueView.getQueueSize());
+
+ // ack it outside the TX and it should be really ack'd
+ frame = "ACK\n" + "subscription:12345\n" + "message-id:" +
+ received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ assertTrue("Message not ack'd", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return queueView.getQueueSize() == 0;
+ }
+ }));
+
+ String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+ "receipt:1\n" + "id:12345\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(unsub);
+
+ String receipt = stompConnection.receiveFrame();
+ assertTrue(receipt.contains("RECEIPT"));
+ }
}
[2/3] activemq git commit: AMQ-6697 Preserve dispatched state on
client-individual tx ack
Posted by ta...@apache.org.
AMQ-6697 Preserve dispatched state on client-individual tx ack
Need to preserve the messages in the dispatched list when a
client-individual ack comes in so that on abort the state remains
dispatched and the message can still be ack'd
(cherry picked from commit e83bb6dc38ed793ead919e5d7d6d9146816c66a5)
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0be8b63f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0be8b63f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0be8b63f
Branch: refs/heads/activemq-5.14.x
Commit: 0be8b63fde120aff68248cf0b10869c83ae7d21c
Parents: 1c141ea
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Jun 2 14:19:44 2017 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Jun 2 14:21:54 2017 -0400
----------------------------------------------------------------------
.../activemq/transport/stomp/StompSubscription.java | 16 +++++++---------
.../activemq/transport/stomp/Stomp11Test.java | 14 ++++++++++++--
2 files changed, 19 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/0be8b63f/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
index d4492e1..889b6f7 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
@@ -49,8 +49,8 @@ public class StompSubscription {
protected final String subscriptionId;
protected final ConsumerInfo consumerInfo;
- protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>();
- protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>();
+ protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<>();
+ protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<>();
protected String ackMode = AUTO_ACK;
protected ActiveMQDestination destination;
@@ -65,15 +65,11 @@ public class StompSubscription {
void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException {
ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
- if (ackMode == CLIENT_ACK) {
- synchronized (this) {
- dispatchedMessage.put(message.getMessageId(), md);
- }
- } else if (ackMode == INDIVIDUAL_ACK) {
+ if (ackMode.equals(CLIENT_ACK) || ackMode.equals(INDIVIDUAL_ACK)) {
synchronized (this) {
dispatchedMessage.put(message.getMessageId(), md);
}
- } else if (ackMode == AUTO_ACK) {
+ } else if (ackMode.equals(AUTO_ACK)) {
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
protocolConverter.getStompTransport().sendToActiveMQ(ack);
}
@@ -179,11 +175,13 @@ public class StompSubscription {
} else if (ackMode == INDIVIDUAL_ACK) {
ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
ack.setMessageID(msgId);
+ ack.setMessageCount(1);
if (transactionId != null) {
unconsumedMessage.add(dispatchedMessage.get(msgId));
ack.setTransactionId(transactionId);
+ } else {
+ dispatchedMessage.remove(msgId);
}
- dispatchedMessage.remove(msgId);
}
return ack;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/0be8b63f/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
index f61c899..5050399 100644
--- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
+++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
@@ -1131,7 +1131,17 @@ public class Stomp11Test extends StompTestSupport {
}
@Test(timeout = 60000)
- public void testTransactionRollbackAllowsSecondAckOutsideTX() throws Exception {
+ public void testTransactionRollbackAllowsSecondAckOutsideTXClientAck() throws Exception {
+ doTestTransactionRollbackAllowsSecondAckOutsideTXClientAck("client");
+ }
+
+ @Test(timeout = 60000)
+ public void testTransactionRollbackAllowsSecondAckOutsideTXClientIndividualAck() throws Exception {
+ doTestTransactionRollbackAllowsSecondAckOutsideTXClientAck("client-individual");
+ }
+
+ public void doTestTransactionRollbackAllowsSecondAckOutsideTXClientAck(String ackMode) throws Exception {
+
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("Hello"));
producer.close();
@@ -1150,7 +1160,7 @@ public class Stomp11Test extends StompTestSupport {
stompConnection.sendFrame(frame);
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
- "id:12345\n" + "ack:client\n\n" + Stomp.NULL;
+ "id:12345\n" + "ack:" + ackMode + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
StompFrame received = stompConnection.receive();
[3/3] activemq git commit: AMQ-6697 Make the MBean explicitly final
for java 7 support
Posted by ta...@apache.org.
AMQ-6697 Make the MBean explicitly final for java 7 support
(cherry picked from commit 8417ce537b2cb96965827ebfb793f31814ba1ddd)
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e38ac94a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e38ac94a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e38ac94a
Branch: refs/heads/activemq-5.14.x
Commit: e38ac94a27347c89dddcd30c7a3e6063a69308dd
Parents: 0be8b63
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Jun 2 14:20:56 2017 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Jun 2 14:22:03 2017 -0400
----------------------------------------------------------------------
.../test/java/org/apache/activemq/transport/stomp/Stomp11Test.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/e38ac94a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
index 5050399..e12eaca 100644
--- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
+++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
@@ -1153,7 +1153,7 @@ public class Stomp11Test extends StompTestSupport {
String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("CONNECTED"));
- QueueViewMBean queueView = getProxyToQueue(getQueueName());
+ final QueueViewMBean queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getQueueSize());
frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;