You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2019/05/29 21:25:50 UTC

[activemq] branch master updated: AMQ-7218 Fix loss of Ack id from tracking after TX commit / abort

This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/master by this push:
     new 063d24e  AMQ-7218 Fix loss of Ack id from tracking after TX commit / abort
063d24e is described below

commit 063d24e6d653bed3dd3c56302758de29143e5e82
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Wed May 29 17:25:30 2019 -0400

    AMQ-7218 Fix loss of Ack id from tracking after TX commit / abort
    
    Ensure that we properly track Ack IDs across TX commit and abort
    operations and only clear out values enlisted in the TX on commit and
    re-acquire the Ack Ids on TX abort.
---
 .../transport/stomp/ProtocolConverter.java         |  96 +---
 .../activemq/transport/stomp/StompAckEntry.java    | 101 ++++
 .../stomp/StompQueueBrowserSubscription.java       |  11 +-
 .../transport/stomp/StompSubscription.java         | 133 +++--
 .../activemq/transport/stomp/Stomp12Test.java      | 599 ++++++++++++++++++++-
 5 files changed, 801 insertions(+), 139 deletions(-)

diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
index 39b6d09..b3deac4 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
@@ -22,7 +22,7 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -116,8 +116,10 @@ public class ProtocolConverter {
     private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<>();
     private final StompTransport stompTransport;
 
-    private final ConcurrentMap<String, AckEntry> pedingAcks = new ConcurrentHashMap<>();
-    private final IdGenerator ACK_ID_GENERATOR = new IdGenerator();
+    // Global Map shared with all subscriptions to allow finding the sub associated with an ACK Id
+    private final ConcurrentMap<String, StompAckEntry> pendingAcksTracker = new ConcurrentHashMap<>();
+    // Read-Only view used in this class to enforce the separation of read vs update of the global index.
+    private final Map<String, StompAckEntry> pendingAcks = Collections.unmodifiableMap(pendingAcksTracker);
 
     private final Object commnadIdMutex = new Object();
     private int lastCommandId;
@@ -131,34 +133,6 @@ public class ProtocolConverter {
     private float hbGracePeriodMultiplier = 1.0f;
     private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT;
 
-    private static class AckEntry {
-
-        private final String messageId;
-        private final StompSubscription subscription;
-
-        public AckEntry(String messageId, StompSubscription subscription) {
-            this.messageId = messageId;
-            this.subscription = subscription;
-        }
-
-        public MessageAck onMessageAck(TransactionId transactionId) {
-            return subscription.onStompMessageAck(messageId, transactionId);
-        }
-
-        public MessageAck onMessageNack(TransactionId transactionId) throws ProtocolException {
-            return subscription.onStompMessageNack(messageId, transactionId);
-        }
-
-        public String getMessageId() {
-            return this.messageId;
-        }
-
-        @SuppressWarnings("unused")
-        public StompSubscription getSubscription() {
-            return this.subscription;
-        }
-    }
-
     public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) {
         this.stompTransport = stompTransport;
         this.brokerContext = brokerContext;
@@ -387,9 +361,9 @@ public class ProtocolConverter {
         boolean nacked = false;
 
         if (ackId != null) {
-            AckEntry pendingAck = this.pedingAcks.remove(ackId);
+            StompAckEntry pendingAck = this.pendingAcks.get(ackId);
             if (pendingAck != null) {
-                messageId = pendingAck.getMessageId();
+                messageId = pendingAck.getMessageId().toString();
                 MessageAck ack = pendingAck.onMessageNack(activemqTx);
                 if (ack != null) {
                     sendToActiveMQ(ack, createResponseHandler(command));
@@ -443,9 +417,9 @@ public class ProtocolConverter {
         boolean acked = false;
 
         if (ackId != null) {
-            AckEntry pendingAck = this.pedingAcks.remove(ackId);
+            StompAckEntry pendingAck = this.pendingAcks.get(ackId);
             if (pendingAck != null) {
-                messageId = pendingAck.getMessageId();
+                messageId = pendingAck.getMessageId().toString();
                 MessageAck ack = pendingAck.onMessageAck(activemqTx);
                 if (ack != null) {
                     sendToActiveMQ(ack, createResponseHandler(command));
@@ -526,8 +500,6 @@ public class ProtocolConverter {
             sub.onStompCommit(activemqTx);
         }
 
-        pedingAcks.clear();
-
         TransactionInfo tx = new TransactionInfo();
         tx.setConnectionId(connectionId);
         tx.setTransactionId(activemqTx);
@@ -557,8 +529,6 @@ public class ProtocolConverter {
             }
         }
 
-        pedingAcks.clear();
-
         TransactionInfo tx = new TransactionInfo();
         tx.setConnectionId(connectionId);
         tx.setTransactionId(activemqTx);
@@ -624,9 +594,9 @@ public class ProtocolConverter {
 
         StompSubscription stompSubscription;
         if (!consumerInfo.isBrowser()) {
-            stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
+            stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION), pendingAcksTracker);
         } else {
-            stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
+            stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION), pendingAcksTracker);
         }
         stompSubscription.setDestination(actualDest);
 
@@ -845,6 +815,7 @@ public class ProtocolConverter {
 
     protected void onStompDisconnect(StompFrame command) throws ProtocolException {
         if (connected.get()) {
+            LOG.trace("Connection closed with {} pending ACKs still being tracked.", pendingAcks.size());
             sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
             sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
             connected.set(false);
@@ -880,19 +851,7 @@ public class ProtocolConverter {
             MessageDispatch md = (MessageDispatch)command;
             StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
             if (sub != null) {
-                String ackId = null;
-                if (version.equals(Stomp.V1_2) && sub.getAckMode() != Stomp.Headers.Subscribe.AckModeValues.AUTO && md.getMessage() != null) {
-                    AckEntry pendingAck = new AckEntry(md.getMessage().getMessageId().toString(), sub);
-                    ackId = this.ACK_ID_GENERATOR.generateId();
-                    this.pedingAcks.put(ackId, pendingAck);
-                }
-                try {
-                    sub.onMessageDispatch(md, ackId);
-                } catch (Exception ex) {
-                    if (ackId != null) {
-                        this.pedingAcks.remove(ackId);
-                    }
-                }
+                sub.onMessageDispatch(md);
             }
         } else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) {
             stompTransport.sendToStomp(ping);
@@ -1052,26 +1011,15 @@ public class ProtocolConverter {
         return result;
     }
 
-    /**
-     * Remove all pending acknowledgement markers that are batched into the single
-     * client acknowledge operation.
-     *
-     * @param subscription
-     *      The STOMP Subscription that has performed a client acknowledge.
-     * @param msgIdsToRemove
-     *      List of message IDs that are bound to the subscription that has ack'd
-     */
-    protected void afterClientAck(StompSubscription subscription, ArrayList<String> msgIdsToRemove) {
-        int count = 0;
-
-        for (Map.Entry<String,AckEntry> entry : this.pedingAcks.entrySet()){
-            AckEntry actEntry = entry.getValue();
-            if (msgIdsToRemove.contains(actEntry.messageId)) {
-                this.pedingAcks.remove(entry.getKey());
-                count++;
-            }
-        }
+    boolean isStomp10() {
+        return version.equals(Stomp.V1_0);
+    }
+
+    boolean isStomp11() {
+        return version.equals(Stomp.V1_1);
+    }
 
-        LOG.trace("Subscription:[{}] client acknowledged {} messages", subscription.getSubscriptionId(), count);
+    boolean isStomp12() {
+        return version.equals(Stomp.V1_2);
     }
 }
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompAckEntry.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompAckEntry.java
new file mode 100644
index 0000000..1edcb62
--- /dev/null
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompAckEntry.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+
+/**
+ * Tracker object for Messages that carry STOMP v1.2 ACK IDs
+ */
+public class StompAckEntry {
+
+    private final String ackId;
+    private final MessageId messageId;
+    private final StompSubscription subscription;
+    private final MessageDispatch dispatch;
+
+    public StompAckEntry(MessageDispatch dispatch, String ackId, StompSubscription subscription) {
+        this.messageId = dispatch.getMessage().getMessageId();
+        this.subscription = subscription;
+        this.ackId = ackId;
+        this.dispatch = dispatch;
+    }
+
+    public MessageAck onMessageAck(TransactionId transactionId) {
+        return subscription.onStompMessageAck(messageId.toString(), transactionId);
+    }
+
+    public MessageAck onMessageNack(TransactionId transactionId) throws ProtocolException {
+        return subscription.onStompMessageNack(messageId.toString(), transactionId);
+    }
+
+    public MessageId getMessageId() {
+        return this.messageId;
+    }
+
+    public MessageDispatch getMessageDispatch() {
+        return this.dispatch;
+    }
+
+    public String getAckId() {
+        return this.ackId;
+    }
+
+    public StompSubscription getSubscription() {
+        return this.subscription;
+    }
+
+    @Override
+    public String toString() {
+        return "AckEntry[ msgId:" + messageId + ", ackId:" + ackId + ", sub:" + subscription + " ]";
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((messageId == null) ? 0 : messageId.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+
+        StompAckEntry other = (StompAckEntry) obj;
+        if (messageId == null) {
+            if (other.messageId != null) {
+                return false;
+            }
+        } else if (!messageId.equals(other.messageId)) {
+            return false;
+        }
+
+        return true;
+    }
+}
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java
index 9e267ac..1238572 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.stomp;
 
 import java.io.IOException;
+import java.util.Map;
 
 import javax.jms.JMSException;
 
@@ -27,15 +28,14 @@ import org.apache.activemq.command.TransactionId;
 
 public class StompQueueBrowserSubscription extends StompSubscription {
 
-    public StompQueueBrowserSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
-        super(stompTransport, subscriptionId, consumerInfo, transformation);
+    public StompQueueBrowserSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation, Map<String, StompAckEntry> pendingAcks) {
+        super(stompTransport, subscriptionId, consumerInfo, transformation, pendingAcks);
     }
 
     @Override
-    void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException {
-
+    void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
         if (md.getMessage() != null) {
-            super.onMessageDispatch(md, ackId);
+            super.onMessageDispatch(md);
         } else {
             StompFrame browseDone = new StompFrame(Stomp.Responses.MESSAGE);
             browseDone.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, this.getSubscriptionId());
@@ -52,5 +52,4 @@ public class StompQueueBrowserSubscription extends StompSubscription {
     public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException {
         throw new ProtocolException("Cannot Nack a message on a Queue Browser Subscription.");
     }
-
 }
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
index 0d8e308..95fe986 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
@@ -17,12 +17,10 @@
 package org.apache.activemq.transport.stomp;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.Map;
-import java.util.Map.Entry;
 
 import javax.jms.JMSException;
 
@@ -34,6 +32,9 @@ import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.IdGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Keeps track of the STOMP subscription so that acking is correctly done.
@@ -42,6 +43,10 @@ import org.apache.activemq.command.TransactionId;
  */
 public class StompSubscription {
 
+    private static final Logger LOG = LoggerFactory.getLogger(StompSubscription.class);
+
+    private static final IdGenerator ACK_ID_GENERATOR = new IdGenerator();
+
     public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
     public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
     public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL;
@@ -50,27 +55,37 @@ public class StompSubscription {
     protected final String subscriptionId;
     protected final ConsumerInfo consumerInfo;
 
-    protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<>();
-    protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<>();
+    protected final Map<MessageId, StompAckEntry> dispatchedMessage = new LinkedHashMap<>();
+    protected final Map<String, StompAckEntry> pendingAcks; // STOMP v1.2 requires ACK ID tracking
+    protected final LinkedList<StompAckEntry> transactedMessages = new LinkedList<>();
 
     protected String ackMode = AUTO_ACK;
     protected ActiveMQDestination destination;
     protected String transformation;
 
-    public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
+    public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation, Map<String, StompAckEntry> pendingAcks) {
         this.protocolConverter = stompTransport;
         this.subscriptionId = subscriptionId;
         this.consumerInfo = consumerInfo;
         this.transformation = transformation;
+        this.pendingAcks = pendingAcks;
     }
 
-    void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException {
+    void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
         ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
-        if (ackMode.equals(CLIENT_ACK) || ackMode.equals(INDIVIDUAL_ACK)) {
+
+        String ackId = null;
+        if (isClientAck() || isIndividualAck()) {
+            ackId = ACK_ID_GENERATOR.generateId();
+            StompAckEntry pendingAck = new StompAckEntry(md, ackId, this);
+
             synchronized (this) {
-                dispatchedMessage.put(message.getMessageId(), md);
+                dispatchedMessage.put(message.getMessageId(), pendingAck);
             }
-        } else if (ackMode.equals(AUTO_ACK)) {
+            if (protocolConverter.isStomp12()) {
+                this.pendingAcks.put(ackId, pendingAck);
+            }
+        } else if (isAutoAck()) {
             MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
             protocolConverter.getStompTransport().sendToActiveMQ(ack);
         }
@@ -93,35 +108,48 @@ public class StompSubscription {
             command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
         }
 
-        if (ackId != null) {
+        if (protocolConverter.isStomp12() && ackId != null) {
             command.getHeaders().put(Stomp.Headers.Message.ACK_ID, ackId);
         }
 
-        protocolConverter.getStompTransport().sendToStomp(command);
+        try {
+            protocolConverter.getStompTransport().sendToStomp(command);
+        } catch (IOException ex) {
+            if (ackId != null) {
+                pendingAcks.remove(ackId);
+            }
+            throw ex;
+        }
     }
 
     synchronized void onStompAbort(TransactionId transactionId) {
-        unconsumedMessage.clear();
+        // Restore the pending ACKs so that their ACK IDs are again valid for a client
+        // to operate on.
+        LOG.trace("Transaction Abort restoring {} pending ACKs to valid state.", transactedMessages.size());
+        for (StompAckEntry ackEntry : transactedMessages) {
+            if (protocolConverter.isStomp12()) {
+                pendingAcks.put(ackEntry.getAckId(), ackEntry);
+            }
+        }
+        transactedMessages.clear();
     }
 
     void onStompCommit(TransactionId transactionId) {
         MessageAck ack = null;
         synchronized (this) {
-            for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
-                @SuppressWarnings("rawtypes")
-                Map.Entry entry = (Entry)iter.next();
-                MessageDispatch msg = (MessageDispatch)entry.getValue();
-                if (unconsumedMessage.contains(msg)) {
-                    iter.remove();
+            for (Iterator<StompAckEntry> iterator = dispatchedMessage.values().iterator(); iterator.hasNext();) {
+                StompAckEntry ackEntry = iterator.next();
+                if (transactedMessages.contains(ackEntry)) {
+                    iterator.remove();
                 }
             }
 
             // For individual Ack we already sent an Ack that will be applied on commit
             // we don't send a second standard Ack as that would produce an error.
-            if (!unconsumedMessage.isEmpty() && ackMode == CLIENT_ACK) {
-                ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size());
+            if (!transactedMessages.isEmpty() && isClientAck()) {
+                ack = new MessageAck(transactedMessages.getLast().getMessageDispatch(), MessageAck.STANDARD_ACK_TYPE, transactedMessages.size());
                 ack.setTransactionId(transactionId);
-                unconsumedMessage.clear();
+                transactedMessages.clear();
             }
         }
         // avoid contention with onMessageDispatch
@@ -131,10 +159,10 @@ public class StompSubscription {
     }
 
     synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
-
         MessageId msgId = new MessageId(messageId);
 
-        if (!dispatchedMessage.containsKey(msgId)) {
+        final StompAckEntry ackEntry = dispatchedMessage.get(msgId);
+        if (ackEntry == null) {
             return null;
         }
 
@@ -142,35 +170,33 @@ public class StompSubscription {
         ack.setDestination(consumerInfo.getDestination());
         ack.setConsumerId(consumerInfo.getConsumerId());
 
-        final ArrayList<String> acknowledgedMessages = new ArrayList<>();
-
-        if (ackMode == CLIENT_ACK) {
+        if (isClientAck()) {
             if (transactionId == null) {
                 ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
             } else {
                 ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
             }
             int count = 0;
-            for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
+            for (Iterator<StompAckEntry> iterator = dispatchedMessage.values().iterator(); iterator.hasNext();) {
+                StompAckEntry entry = iterator.next();
+                MessageId current = entry.getMessageId();
 
-                @SuppressWarnings("rawtypes")
-                Map.Entry entry = (Entry)iter.next();
-                MessageId id = (MessageId)entry.getKey();
-                MessageDispatch msg = (MessageDispatch)entry.getValue();
+                if (entry.getAckId() != null) {
+                    pendingAcks.remove(entry.getAckId());
+                }
 
                 if (transactionId != null) {
-                    if (!unconsumedMessage.contains(msg)) {
-                        unconsumedMessage.add(msg);
+                    if (!transactedMessages.contains(entry)) {
+                        transactedMessages.add(entry);
                         count++;
                     }
                 } else {
-                    acknowledgedMessages.add(id.toString());
-                    iter.remove();
+                    iterator.remove();
                     count++;
                 }
 
-                if (id.equals(msgId)) {
-                    ack.setLastMessageId(id);
+                if (current.equals(msgId)) {
+                    ack.setLastMessageId(current);
                     break;
                 }
             }
@@ -178,14 +204,15 @@ public class StompSubscription {
             if (transactionId != null) {
                 ack.setTransactionId(transactionId);
             }
-
-            this.protocolConverter.afterClientAck(this, acknowledgedMessages);
-        } else if (ackMode == INDIVIDUAL_ACK) {
+        } else if (isIndividualAck()) {
+            if (ackEntry.getAckId() != null) {
+                pendingAcks.remove(ackEntry.getAckId());
+            }
             ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
             ack.setMessageID(msgId);
             ack.setMessageCount(1);
             if (transactionId != null) {
-                unconsumedMessage.add(dispatchedMessage.get(msgId));
+                transactedMessages.add(dispatchedMessage.get(msgId));
                 ack.setTransactionId(transactionId);
             } else {
                 dispatchedMessage.remove(msgId);
@@ -196,23 +223,29 @@ public class StompSubscription {
     }
 
     public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException {
-
         MessageId msgId = new MessageId(messageId);
 
         if (!dispatchedMessage.containsKey(msgId)) {
             return null;
         }
 
+        final StompAckEntry ackEntry = dispatchedMessage.get(msgId);
+
+        if (ackEntry.getAckId() != null) {
+            pendingAcks.remove(ackEntry.getAckId());
+        }
+
         MessageAck ack = new MessageAck();
         ack.setDestination(consumerInfo.getDestination());
         ack.setConsumerId(consumerInfo.getConsumerId());
         ack.setAckType(MessageAck.POSION_ACK_TYPE);
         ack.setMessageID(msgId);
         if (transactionId != null) {
-            unconsumedMessage.add(dispatchedMessage.get(msgId));
+            transactedMessages.add(ackEntry);
             ack.setTransactionId(transactionId);
+        } else {
+            dispatchedMessage.remove(msgId);
         }
-        dispatchedMessage.remove(msgId);
 
         return ack;
     }
@@ -225,6 +258,18 @@ public class StompSubscription {
         this.ackMode = ackMode;
     }
 
+    public boolean isAutoAck() {
+        return ackMode.equals(AUTO_ACK);
+    }
+
+    public boolean isClientAck() {
+        return ackMode.equals(CLIENT_ACK);
+    }
+
+    public boolean isIndividualAck() {
+        return ackMode.equals(INDIVIDUAL_ACK);
+    }
+
     public String getSubscriptionId() {
         return subscriptionId;
     }
diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java
index b7560c7..3944c50 100644
--- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java
+++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java
@@ -21,7 +21,9 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.net.Socket;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
@@ -40,7 +42,6 @@ public class Stomp12Test extends StompTestSupport {
 
     @Override
     public void setUp() throws Exception {
-
         super.setUp();
 
         stompConnect();
@@ -70,7 +71,6 @@ public class Stomp12Test extends StompTestSupport {
 
     @Test(timeout = 60000)
     public void testTelnetStyleSends() throws Exception {
-
         stompConnection.setVersion(Stomp.V1_2);
 
         String connect = "CONNECT\r\n" +
@@ -107,7 +107,6 @@ public class Stomp12Test extends StompTestSupport {
 
     @Test(timeout = 60000)
     public void testClientAckWithoutAckId() throws Exception {
-
         stompConnection.setVersion(Stomp.V1_2);
 
         String connect = "STOMP\r\n" +
@@ -150,18 +149,40 @@ public class Stomp12Test extends StompTestSupport {
         assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
         assertEquals("1", received.getBody());
 
-        String frame = "ACK\n" + "message-id:" +
-                received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+        String ackId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID);
+
+        // Put ACK ID in wrong header
+        String frame = "ACK\n" + "message-id:" + ackId + "\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
 
         received = stompConnection.receive();
         assertTrue(received.getAction().equals("ERROR"));
         LOG.info("Broker sent: " + received);
+
+        // Now place it in the correct location and check it still works.
+        frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        receipt = stompConnection.receive();
+        LOG.info("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        receiptId = receipt.getHeaders().get("receipt-id");
+        assertEquals("2", receiptId);
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+        frame = "DISCONNECT\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
     }
 
     @Test(timeout = 60000)
     public void testClientAck() throws Exception {
-
         stompConnection.setVersion(Stomp.V1_2);
 
         String connect = "STOMP\r\n" +
@@ -255,11 +276,106 @@ public class Stomp12Test extends StompTestSupport {
         frame = "ACK\n" + "id:" +
                 received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+        frame = "DISCONNECT\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
     }
 
     @Test(timeout = 60000)
-    public void testClientIndividualAck() throws Exception {
+    public void testClientAckMultipleMessagesWithSingleAck() throws Exception {
+        final int MESSAGE_COUNT = 10;
+
+        stompConnection.setVersion(Stomp.V1_2);
+
+        String connect = "STOMP\r\n" +
+                         "accept-version:1.2\r\n" +
+                         "login:system\r\n" +
+                         "passcode:manager\r\n" +
+                         "\r\n" +
+                         "\u0000\r\n";
+
+        stompConnection.sendFrame(connect);
+
+        String f = stompConnection.receiveFrame();
+        LOG.info("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("version:1.2") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
 
+        // Send some messages
+        for (int n = 0; n < MESSAGE_COUNT; n++) {
+            String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL;
+            stompConnection.sendFrame(message);
+        }
+
+        String subscribe = "SUBSCRIBE\n" +
+                           "id:1\n" +
+                           "ack:client\n" +
+                           "destination:/queue/" + getQueueName() + "\n" +
+                           "receipt:1\n" +
+                           "\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(subscribe);
+
+        StompFrame receipt = stompConnection.receive();
+        LOG.info("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        String receiptId = receipt.getHeaders().get("receipt-id");
+        assertEquals("1", receiptId);
+        assertEquals(MESSAGE_COUNT, getProxyToQueue(getQueueName()).getQueueSize());
+
+        String lastAckId = null;
+
+        for (int n = 0; n < MESSAGE_COUNT; n++) {
+            StompFrame received = stompConnection.receive();
+            LOG.info("Broker sent: " + received);
+            assertTrue(received.getAction().equals("MESSAGE"));
+            assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+            assertEquals(String.format("%d", n), received.getBody());
+
+            lastAckId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID);
+        }
+
+        String frame = "ACK\n" + "id:" +  lastAckId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        receipt = stompConnection.receive();
+        LOG.info("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        receiptId = receipt.getHeaders().get("receipt-id");
+        assertEquals("2", receiptId);
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+        frame = "DISCONNECT\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToBroker().getCurrentConnectionsCount() == 1;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+    }
+
+    @Test(timeout = 60000)
+    public void testClientIndividualAck() throws Exception {
         stompConnection.setVersion(Stomp.V1_2);
 
         String connect = "STOMP\r\n" +
@@ -345,24 +461,117 @@ public class Stomp12Test extends StompTestSupport {
         assertTrue(received.getAction().equals("MESSAGE"));
         assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
         assertEquals("1", received.getBody());
+        String message1AckId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID);
 
-        frame = "ACK\n" + "id:" +
-                received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+        frame = "ACK\n" + "id:" + message1AckId + "\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
 
         received = stompConnection.receive();
         assertTrue(received.getAction().equals("MESSAGE"));
         assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
         assertEquals("3", received.getBody());
+        String message3AckId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID);
 
-        frame = "ACK\n" + "id:" +
-                received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+        frame = "ACK\n" + "id:" + message3AckId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        receipt = stompConnection.receive();
+        LOG.info("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        receiptId = receipt.getHeaders().get("receipt-id");
+        assertEquals("2", receiptId);
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+        frame = "DISCONNECT\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
     }
 
     @Test(timeout = 60000)
-    public void testQueueBrowerSubscription() throws Exception {
+    public void testRepeatedClientIndividualAckInMultipleTransactions() throws Exception {
+        final int MESSAGE_COUNT = 50;
+
+        stompConnection.setVersion(Stomp.V1_2);
+
+        String connect = "STOMP\r\n" +
+                         "accept-version:1.2\r\n" +
+                         "login:system\r\n" +
+                         "passcode:manager\r\n" +
+                         "\r\n" +
+                         "\u0000\r\n";
+
+        stompConnection.sendFrame(connect);
+
+        String f = stompConnection.receiveFrame();
+        LOG.info("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("version:1.2") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+
+        // Send some messages
+        for (int n = 0; n < MESSAGE_COUNT; n++) {
+            String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL;
+            stompConnection.sendFrame(message);
+        }
 
+        // Subscribe to the queue
+        String subscribe = "SUBSCRIBE\n" +
+                           "id:1\n" +
+                           "activemq.prefetchSize:1\n" +
+                           "ack:client-individual\n" +
+                           "destination:/queue/" + getQueueName() + "\n" +
+                           "receipt:1\n" +
+                           "\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(subscribe);
+
+        StompFrame receipt = stompConnection.receive();
+        LOG.info("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        String receiptId = receipt.getHeaders().get("receipt-id");
+        assertEquals("1", receiptId);
+
+        // Receive all messages, each in their own transaction
+        // Ensure we don't have any errors
+        for (int n = 0; n < MESSAGE_COUNT; n++) {
+            StompFrame received = stompConnection.receive();
+            LOG.info("Broker sent: " + received);
+            assertTrue(received.getAction().equals("MESSAGE"));
+            assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+            assertEquals(String.format("%d", n), received.getBody());
+
+            // Ack & Commit the first message
+            String begin = "BEGIN\n" + "transaction:tx" + String.format("%d", n) + "\n\n" + Stomp.NULL;
+            stompConnection.sendFrame(begin);
+
+            String frame = "ACK\n" + "transaction:tx" + String.format("%d", n) + "\n" + "id:" +
+                    received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+            stompConnection.sendFrame(frame);
+
+            String commit = "COMMIT\n" + "transaction:tx" + String.format("%d", n) + "\n\n" + Stomp.NULL;
+            stompConnection.sendFrame(commit);
+        }
+
+        String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToBroker().getCurrentConnectionsCount() <= 1;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
+    }
+
+    @Test(timeout = 60000)
+    public void testQueueBrowerSubscription() throws Exception {
         final int MSG_COUNT = 10;
 
         String connectFrame = "STOMP\n" +
@@ -523,7 +732,6 @@ public class Stomp12Test extends StompTestSupport {
 
     @Test(timeout = 60000)
     public void testSubscribeWithNoId() throws Exception {
-
         String connectFrame = "STOMP\n" +
                               "login:system\n" +
                               "passcode:manager\n" +
@@ -571,7 +779,7 @@ public class Stomp12Test extends StompTestSupport {
 
         long usageStart = brokerService.getSystemUsage().getMemoryUsage().getUsage();
 
-        for(int i = 0; i < MSG_COUNT; ++i) {
+        for (int i = 0; i < MSG_COUNT; ++i) {
             String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" +
                              "receipt:0\n" +
                              "myXkProp:" + bigProp + "\n"+
@@ -593,11 +801,372 @@ public class Stomp12Test extends StompTestSupport {
                            "id:12345\n" + "browser:true\n\n" + Stomp.NULL;
         stompConnection.sendFrame(subscribe);
 
-        for(int i = 0; i < MSG_COUNT; ++i) {
+        for (int i = 0; i < MSG_COUNT; ++i) {
             StompFrame message = stompConnection.receive();
             assertEquals(Stomp.Responses.MESSAGE, message.getAction());
             assertEquals("12345", message.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION));
         }
+    }
+
+    @Test(timeout = 60000)
+    public void testAckMessagesAfterTransactionAbortClientIndividualAckMode() throws Exception {
+        doTestMessagesRetirementAfterTransactionAbortClientIndividualAckMode(false);
+    }
+
+    @Test(timeout = 60000)
+    public void testNackMessagesAfterTransactionAbortClientIndividualAckMode() throws Exception {
+        doTestMessagesRetirementAfterTransactionAbortClientIndividualAckMode(true);
+    }
+
+    private void doTestMessagesRetirementAfterTransactionAbortClientIndividualAckMode(boolean nack) throws Exception {
+        final int MESSAGE_COUNT = 10;
+
+        stompConnection.setVersion(Stomp.V1_2);
+
+        String connect = "STOMP\r\n" +
+                         "accept-version:1.2\r\n" +
+                         "login:system\r\n" +
+                         "passcode:manager\r\n" +
+                         "\r\n" +
+                         "\u0000\r\n";
+
+        stompConnection.sendFrame(connect);
+
+        String f = stompConnection.receiveFrame();
+        LOG.info("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("version:1.2") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+
+        // Send some messages
+        for (int n = 0; n < MESSAGE_COUNT; n++) {
+            String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL;
+            stompConnection.sendFrame(message);
+        }
+
+        // Subscribe to the queue
+        String subscribe = "SUBSCRIBE\n" +
+                           "id:1\n" +
+                           "activemq.prefetchSize:1\n" +
+                           "ack:client-individual\n" +
+                           "destination:/queue/" + getQueueName() + "\n" +
+                           "receipt:1\n" +
+                           "\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(subscribe);
+
+        StompFrame receipt = stompConnection.receive();
+        LOG.info("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        String receiptId = receipt.getHeaders().get("receipt-id");
+        assertEquals("1", receiptId);
+
+        // Start a TX that will later be aborted.
+        String begin = "BEGIN\n" + "transaction:tx1" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(begin);
+
+        List<String> ackIds = new ArrayList<>(MESSAGE_COUNT);
+
+        for (int n = 0; n < MESSAGE_COUNT; n++) {
+            StompFrame received = stompConnection.receive();
+            LOG.info("Broker sent: " + received);
+            assertTrue(received.getAction().equals("MESSAGE"));
+            assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+            assertEquals(String.format("%d", n), received.getBody());
+
+            ackIds.add(received.getHeaders().get(Stomp.Headers.Message.ACK_ID));
+
+            String frame = "ACK\n" + "transaction:tx1" + "\n" + "id:" +
+                    received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+            stompConnection.sendFrame(frame);
+        }
+
+        String commit = "ABORT\n" + "transaction:tx1" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(commit);
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToQueue(getQueueName()).getQueueSize() == MESSAGE_COUNT;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+        for (String ackId : ackIds) {
+            if (nack) {
+                String frame = "NACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+                stompConnection.sendFrame(frame);
+            } else {
+                String frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+                stompConnection.sendFrame(frame);
+            }
+
+            receipt = stompConnection.receive();
+            LOG.info("Broker sent: " + receipt);
+            assertTrue(receipt.getAction().startsWith("RECEIPT"));
+            receiptId = receipt.getHeaders().get("receipt-id");
+            assertEquals("2", receiptId);
+        }
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+        String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToBroker().getCurrentConnectionsCount() <= 1;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
+    }
+
+    @Test(timeout = 60000)
+    public void testAckMessagesAfterTransactionAbortClientAckMode() throws Exception {
+        doTestMessagesRetirementAfterTransactionAbortClientAckMode(false);
+    }
+
+    @Test(timeout = 60000)
+    public void testNackMessagesAfterTransactionAbortClientAckMode() throws Exception {
+        doTestMessagesRetirementAfterTransactionAbortClientAckMode(true);
+    }
+
+    private void doTestMessagesRetirementAfterTransactionAbortClientAckMode(boolean nack) throws Exception {
+        final int MESSAGE_COUNT = 10;
+
+        stompConnection.setVersion(Stomp.V1_2);
+
+        String connect = "STOMP\r\n" +
+                         "accept-version:1.2\r\n" +
+                         "login:system\r\n" +
+                         "passcode:manager\r\n" +
+                         "\r\n" +
+                         "\u0000\r\n";
+
+        stompConnection.sendFrame(connect);
+
+        String f = stompConnection.receiveFrame();
+        LOG.info("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("version:1.2") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+
+        // Send some messages
+        for (int n = 0; n < MESSAGE_COUNT; n++) {
+            String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL;
+            stompConnection.sendFrame(message);
+        }
+
+        // Subscribe to the queue
+        String subscribe = "SUBSCRIBE\n" +
+                           "id:1\n" +
+                           "activemq.prefetchSize:" + MESSAGE_COUNT + "\n" +
+                           "ack:client\n" +
+                           "destination:/queue/" + getQueueName() + "\n" +
+                           "receipt:1\n" +
+                           "\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(subscribe);
+
+        StompFrame receipt = stompConnection.receive();
+        LOG.info("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        String receiptId = receipt.getHeaders().get("receipt-id");
+        assertEquals("1", receiptId);
+
+        // Start a TX that will later be aborted.
+        String begin = "BEGIN\n" + "transaction:tx1" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(begin);
+
+        List<String> ackIds = new ArrayList<>(MESSAGE_COUNT);
+
+        for (int n = 0; n < MESSAGE_COUNT; n++) {
+            StompFrame received = stompConnection.receive();
+            LOG.info("Broker sent: " + received);
+            assertTrue(received.getAction().equals("MESSAGE"));
+            assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+            assertEquals(String.format("%d", n), received.getBody());
+
+            ackIds.add(received.getHeaders().get(Stomp.Headers.Message.ACK_ID));
+        }
+
+        // Client ACK that enlists all messages in the TX
+        String frame = "ACK\n" + "transaction:tx1" + "\n" + "id:" + ackIds.get(MESSAGE_COUNT - 1) + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToQueue(getQueueName()).getQueueSize() == MESSAGE_COUNT;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
 
+        String commit = "ABORT\n" + "transaction:tx1" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(commit);
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToQueue(getQueueName()).getQueueSize() == MESSAGE_COUNT;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+        for (String ackId : ackIds) {
+            if (nack) {
+                frame = "NACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+                stompConnection.sendFrame(frame);
+            } else {
+                frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+                stompConnection.sendFrame(frame);
+            }
+
+            receipt = stompConnection.receive();
+            LOG.info("Broker sent: " + receipt);
+            assertTrue(receipt.getAction().startsWith("RECEIPT"));
+            receiptId = receipt.getHeaders().get("receipt-id");
+            assertEquals("2", receiptId);
+        }
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToBroker().getCurrentConnectionsCount() <= 1;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
+    }
+
+    @Test(timeout = 60000)
+    public void testMixedAckNackWithMessageAckIdsClientAck() throws Exception {
+        doTestMixedAckNackWithMessageAckIds(false);
+    }
+
+    @Test(timeout = 60000)
+    public void testMixedAckNackWithMessageAckIdsClientIndividualAck() throws Exception {
+        doTestMixedAckNackWithMessageAckIds(true);
+    }
+
+    public void doTestMixedAckNackWithMessageAckIds(boolean individual) throws Exception {
+
+        final int MESSAGE_COUNT = 20;
+
+        stompConnection.setVersion(Stomp.V1_2);
+
+        String connect = "STOMP\r\n" +
+                         "accept-version:1.2\r\n" +
+                         "login:system\r\n" +
+                         "passcode:manager\r\n" +
+                         "\r\n" +
+                         "\u0000\r\n";
+
+        stompConnection.sendFrame(connect);
+
+        String f = stompConnection.receiveFrame();
+        LOG.info("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("version:1.2") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+
+        // Send some messages
+        for (int n = 0; n < MESSAGE_COUNT; n++) {
+            String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL;
+            stompConnection.sendFrame(message);
+        }
+
+        // Subscribe to the queue
+        String subscribe = "SUBSCRIBE\n" +
+                           "id:1\n" +
+                           "activemq.prefetchSize:" + MESSAGE_COUNT + "\n" +
+                           "ack:" + (individual ? "client-individual" : "client") + "\n" +
+                           "destination:/queue/" + getQueueName() + "\n" +
+                           "receipt:1\n" +
+                           "\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(subscribe);
+
+        StompFrame receipt = stompConnection.receive();
+        LOG.info("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        String receiptId = receipt.getHeaders().get("receipt-id");
+        assertEquals("1", receiptId);
+
+        List<String> ackIds = new ArrayList<>(MESSAGE_COUNT);
+
+        for (int n = 0; n < MESSAGE_COUNT; n++) {
+            StompFrame received = stompConnection.receive();
+            LOG.info("Broker sent: " + received);
+            assertTrue(received.getAction().equals("MESSAGE"));
+            assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+            assertEquals(String.format("%d", n), received.getBody());
+
+            ackIds.add(received.getHeaders().get(Stomp.Headers.Message.ACK_ID));
+        }
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToQueue(getQueueName()).getQueueSize() == MESSAGE_COUNT;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+        boolean nack = false;
+
+        for (String ackId : ackIds) {
+            if (nack) {
+                String frame = "NACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+                stompConnection.sendFrame(frame);
+                nack = !nack;
+            } else {
+                String frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+                stompConnection.sendFrame(frame);
+                nack = !nack;
+            }
+
+            receipt = stompConnection.receive();
+            LOG.info("Broker sent: " + receipt);
+            assertTrue(receipt.getAction().startsWith("RECEIPT"));
+            receiptId = receipt.getHeaders().get("receipt-id");
+            assertEquals("2", receiptId);
+        }
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+        String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToBroker().getCurrentConnectionsCount() <= 1;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
     }
 }