You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2017/06/02 18:19:56 UTC

activemq git commit: AMQ-6697 Preserve dispatched state on client-individual tx ack

Repository: activemq
Updated Branches:
  refs/heads/master bd8661796 -> e83bb6dc3


AMQ-6697 Preserve dispatched state on client-individual tx ack

Need to preserve the messages in the dispatched list when a
client-individual ack comes in so that on abort the state remains
dispatched and the message can still be ack'd

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e83bb6dc
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e83bb6dc
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e83bb6dc

Branch: refs/heads/master
Commit: e83bb6dc38ed793ead919e5d7d6d9146816c66a5
Parents: bd86617
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Jun 2 14:19:44 2017 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Jun 2 14:19:44 2017 -0400

----------------------------------------------------------------------
 .../activemq/transport/stomp/StompSubscription.java | 16 +++++++---------
 .../activemq/transport/stomp/Stomp11Test.java       | 14 ++++++++++++--
 2 files changed, 19 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e83bb6dc/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
index d4492e1..889b6f7 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
@@ -49,8 +49,8 @@ public class StompSubscription {
     protected final String subscriptionId;
     protected final ConsumerInfo consumerInfo;
 
-    protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>();
-    protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>();
+    protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<>();
+    protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<>();
 
     protected String ackMode = AUTO_ACK;
     protected ActiveMQDestination destination;
@@ -65,15 +65,11 @@ public class StompSubscription {
 
     void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException {
         ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
-        if (ackMode == CLIENT_ACK) {
-            synchronized (this) {
-                dispatchedMessage.put(message.getMessageId(), md);
-            }
-        } else if (ackMode == INDIVIDUAL_ACK) {
+        if (ackMode.equals(CLIENT_ACK) || ackMode.equals(INDIVIDUAL_ACK)) {
             synchronized (this) {
                 dispatchedMessage.put(message.getMessageId(), md);
             }
-        } else if (ackMode == AUTO_ACK) {
+        } else if (ackMode.equals(AUTO_ACK)) {
             MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
             protocolConverter.getStompTransport().sendToActiveMQ(ack);
         }
@@ -179,11 +175,13 @@ public class StompSubscription {
         } else if (ackMode == INDIVIDUAL_ACK) {
             ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
             ack.setMessageID(msgId);
+            ack.setMessageCount(1);
             if (transactionId != null) {
                 unconsumedMessage.add(dispatchedMessage.get(msgId));
                 ack.setTransactionId(transactionId);
+            } else {
+                dispatchedMessage.remove(msgId);
             }
-            dispatchedMessage.remove(msgId);
         }
         return ack;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e83bb6dc/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
index f61c899..5050399 100644
--- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
+++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
@@ -1131,7 +1131,17 @@ public class Stomp11Test extends StompTestSupport {
     }
 
     @Test(timeout = 60000)
-    public void testTransactionRollbackAllowsSecondAckOutsideTX() throws Exception {
+    public void testTransactionRollbackAllowsSecondAckOutsideTXClientAck() throws Exception {
+        doTestTransactionRollbackAllowsSecondAckOutsideTXClientAck("client");
+    }
+
+    @Test(timeout = 60000)
+    public void testTransactionRollbackAllowsSecondAckOutsideTXClientIndividualAck() throws Exception {
+        doTestTransactionRollbackAllowsSecondAckOutsideTXClientAck("client-individual");
+    }
+
+    public void doTestTransactionRollbackAllowsSecondAckOutsideTXClientAck(String ackMode) throws Exception {
+
         MessageProducer producer = session.createProducer(queue);
         producer.send(session.createTextMessage("Hello"));
         producer.close();
@@ -1150,7 +1160,7 @@ public class Stomp11Test extends StompTestSupport {
         stompConnection.sendFrame(frame);
 
         frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
-            "id:12345\n" + "ack:client\n\n" + Stomp.NULL;
+            "id:12345\n" + "ack:" + ackMode + "\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
 
         StompFrame received = stompConnection.receive();