You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2019/05/29 21:25:50 UTC
[activemq] branch master updated: AMQ-7218 Fix loss of Ack id from
tracking after TX commit / abort
This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/master by this push:
new 063d24e AMQ-7218 Fix loss of Ack id from tracking after TX commit / abort
063d24e is described below
commit 063d24e6d653bed3dd3c56302758de29143e5e82
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Wed May 29 17:25:30 2019 -0400
AMQ-7218 Fix loss of Ack id from tracking after TX commit / abort
Ensure that we properly track Ack IDs across TX commit and abort
operations and only clear out values enlisted in the TX on commit and
re-acquire the Ack Ids on TX abort.
---
.../transport/stomp/ProtocolConverter.java | 96 +---
.../activemq/transport/stomp/StompAckEntry.java | 101 ++++
.../stomp/StompQueueBrowserSubscription.java | 11 +-
.../transport/stomp/StompSubscription.java | 133 +++--
.../activemq/transport/stomp/Stomp12Test.java | 599 ++++++++++++++++++++-
5 files changed, 801 insertions(+), 139 deletions(-)
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
index 39b6d09..b3deac4 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
@@ -22,7 +22,7 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -116,8 +116,10 @@ public class ProtocolConverter {
private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<>();
private final StompTransport stompTransport;
- private final ConcurrentMap<String, AckEntry> pedingAcks = new ConcurrentHashMap<>();
- private final IdGenerator ACK_ID_GENERATOR = new IdGenerator();
+ // Global Map shared with all subscriptions to allow finding the sub associated with an ACK Id
+ private final ConcurrentMap<String, StompAckEntry> pendingAcksTracker = new ConcurrentHashMap<>();
+ // Read-Only view used in this class to enforce the separation of read vs update of the global index.
+ private final Map<String, StompAckEntry> pendingAcks = Collections.unmodifiableMap(pendingAcksTracker);
private final Object commnadIdMutex = new Object();
private int lastCommandId;
@@ -131,34 +133,6 @@ public class ProtocolConverter {
private float hbGracePeriodMultiplier = 1.0f;
private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT;
- private static class AckEntry {
-
- private final String messageId;
- private final StompSubscription subscription;
-
- public AckEntry(String messageId, StompSubscription subscription) {
- this.messageId = messageId;
- this.subscription = subscription;
- }
-
- public MessageAck onMessageAck(TransactionId transactionId) {
- return subscription.onStompMessageAck(messageId, transactionId);
- }
-
- public MessageAck onMessageNack(TransactionId transactionId) throws ProtocolException {
- return subscription.onStompMessageNack(messageId, transactionId);
- }
-
- public String getMessageId() {
- return this.messageId;
- }
-
- @SuppressWarnings("unused")
- public StompSubscription getSubscription() {
- return this.subscription;
- }
- }
-
public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) {
this.stompTransport = stompTransport;
this.brokerContext = brokerContext;
@@ -387,9 +361,9 @@ public class ProtocolConverter {
boolean nacked = false;
if (ackId != null) {
- AckEntry pendingAck = this.pedingAcks.remove(ackId);
+ StompAckEntry pendingAck = this.pendingAcks.get(ackId);
if (pendingAck != null) {
- messageId = pendingAck.getMessageId();
+ messageId = pendingAck.getMessageId().toString();
MessageAck ack = pendingAck.onMessageNack(activemqTx);
if (ack != null) {
sendToActiveMQ(ack, createResponseHandler(command));
@@ -443,9 +417,9 @@ public class ProtocolConverter {
boolean acked = false;
if (ackId != null) {
- AckEntry pendingAck = this.pedingAcks.remove(ackId);
+ StompAckEntry pendingAck = this.pendingAcks.get(ackId);
if (pendingAck != null) {
- messageId = pendingAck.getMessageId();
+ messageId = pendingAck.getMessageId().toString();
MessageAck ack = pendingAck.onMessageAck(activemqTx);
if (ack != null) {
sendToActiveMQ(ack, createResponseHandler(command));
@@ -526,8 +500,6 @@ public class ProtocolConverter {
sub.onStompCommit(activemqTx);
}
- pedingAcks.clear();
-
TransactionInfo tx = new TransactionInfo();
tx.setConnectionId(connectionId);
tx.setTransactionId(activemqTx);
@@ -557,8 +529,6 @@ public class ProtocolConverter {
}
}
- pedingAcks.clear();
-
TransactionInfo tx = new TransactionInfo();
tx.setConnectionId(connectionId);
tx.setTransactionId(activemqTx);
@@ -624,9 +594,9 @@ public class ProtocolConverter {
StompSubscription stompSubscription;
if (!consumerInfo.isBrowser()) {
- stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
+ stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION), pendingAcksTracker);
} else {
- stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
+ stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION), pendingAcksTracker);
}
stompSubscription.setDestination(actualDest);
@@ -845,6 +815,7 @@ public class ProtocolConverter {
protected void onStompDisconnect(StompFrame command) throws ProtocolException {
if (connected.get()) {
+ LOG.trace("Connection closed with {} pending ACKs still being tracked.", pendingAcks.size());
sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
connected.set(false);
@@ -880,19 +851,7 @@ public class ProtocolConverter {
MessageDispatch md = (MessageDispatch)command;
StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
if (sub != null) {
- String ackId = null;
- if (version.equals(Stomp.V1_2) && sub.getAckMode() != Stomp.Headers.Subscribe.AckModeValues.AUTO && md.getMessage() != null) {
- AckEntry pendingAck = new AckEntry(md.getMessage().getMessageId().toString(), sub);
- ackId = this.ACK_ID_GENERATOR.generateId();
- this.pedingAcks.put(ackId, pendingAck);
- }
- try {
- sub.onMessageDispatch(md, ackId);
- } catch (Exception ex) {
- if (ackId != null) {
- this.pedingAcks.remove(ackId);
- }
- }
+ sub.onMessageDispatch(md);
}
} else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) {
stompTransport.sendToStomp(ping);
@@ -1052,26 +1011,15 @@ public class ProtocolConverter {
return result;
}
- /**
- * Remove all pending acknowledgement markers that are batched into the single
- * client acknowledge operation.
- *
- * @param subscription
- * The STOMP Subscription that has performed a client acknowledge.
- * @param msgIdsToRemove
- * List of message IDs that are bound to the subscription that has ack'd
- */
- protected void afterClientAck(StompSubscription subscription, ArrayList<String> msgIdsToRemove) {
- int count = 0;
-
- for (Map.Entry<String,AckEntry> entry : this.pedingAcks.entrySet()){
- AckEntry actEntry = entry.getValue();
- if (msgIdsToRemove.contains(actEntry.messageId)) {
- this.pedingAcks.remove(entry.getKey());
- count++;
- }
- }
+ boolean isStomp10() {
+ return version.equals(Stomp.V1_0);
+ }
+
+ boolean isStomp11() {
+ return version.equals(Stomp.V1_1);
+ }
- LOG.trace("Subscription:[{}] client acknowledged {} messages", subscription.getSubscriptionId(), count);
+ boolean isStomp12() {
+ return version.equals(Stomp.V1_2);
}
}
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompAckEntry.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompAckEntry.java
new file mode 100644
index 0000000..1edcb62
--- /dev/null
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompAckEntry.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+
+/**
+ * Tracker object for Messages that carry STOMP v1.2 ACK IDs
+ */
+public class StompAckEntry {
+
+ private final String ackId;
+ private final MessageId messageId;
+ private final StompSubscription subscription;
+ private final MessageDispatch dispatch;
+
+ public StompAckEntry(MessageDispatch dispatch, String ackId, StompSubscription subscription) {
+ this.messageId = dispatch.getMessage().getMessageId();
+ this.subscription = subscription;
+ this.ackId = ackId;
+ this.dispatch = dispatch;
+ }
+
+ public MessageAck onMessageAck(TransactionId transactionId) {
+ return subscription.onStompMessageAck(messageId.toString(), transactionId);
+ }
+
+ public MessageAck onMessageNack(TransactionId transactionId) throws ProtocolException {
+ return subscription.onStompMessageNack(messageId.toString(), transactionId);
+ }
+
+ public MessageId getMessageId() {
+ return this.messageId;
+ }
+
+ public MessageDispatch getMessageDispatch() {
+ return this.dispatch;
+ }
+
+ public String getAckId() {
+ return this.ackId;
+ }
+
+ public StompSubscription getSubscription() {
+ return this.subscription;
+ }
+
+ @Override
+ public String toString() {
+ return "AckEntry[ msgId:" + messageId + ", ackId:" + ackId + ", sub:" + subscription + " ]";
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((messageId == null) ? 0 : messageId.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+
+ StompAckEntry other = (StompAckEntry) obj;
+ if (messageId == null) {
+ if (other.messageId != null) {
+ return false;
+ }
+ } else if (!messageId.equals(other.messageId)) {
+ return false;
+ }
+
+ return true;
+ }
+}
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java
index 9e267ac..1238572 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java
@@ -17,6 +17,7 @@
package org.apache.activemq.transport.stomp;
import java.io.IOException;
+import java.util.Map;
import javax.jms.JMSException;
@@ -27,15 +28,14 @@ import org.apache.activemq.command.TransactionId;
public class StompQueueBrowserSubscription extends StompSubscription {
- public StompQueueBrowserSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
- super(stompTransport, subscriptionId, consumerInfo, transformation);
+ public StompQueueBrowserSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation, Map<String, StompAckEntry> pendingAcks) {
+ super(stompTransport, subscriptionId, consumerInfo, transformation, pendingAcks);
}
@Override
- void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException {
-
+ void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
if (md.getMessage() != null) {
- super.onMessageDispatch(md, ackId);
+ super.onMessageDispatch(md);
} else {
StompFrame browseDone = new StompFrame(Stomp.Responses.MESSAGE);
browseDone.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, this.getSubscriptionId());
@@ -52,5 +52,4 @@ public class StompQueueBrowserSubscription extends StompSubscription {
public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException {
throw new ProtocolException("Cannot Nack a message on a Queue Browser Subscription.");
}
-
}
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
index 0d8e308..95fe986 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
@@ -17,12 +17,10 @@
package org.apache.activemq.transport.stomp;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
-import java.util.Map.Entry;
import javax.jms.JMSException;
@@ -34,6 +32,9 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.IdGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Keeps track of the STOMP subscription so that acking is correctly done.
@@ -42,6 +43,10 @@ import org.apache.activemq.command.TransactionId;
*/
public class StompSubscription {
+ private static final Logger LOG = LoggerFactory.getLogger(StompSubscription.class);
+
+ private static final IdGenerator ACK_ID_GENERATOR = new IdGenerator();
+
public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL;
@@ -50,27 +55,37 @@ public class StompSubscription {
protected final String subscriptionId;
protected final ConsumerInfo consumerInfo;
- protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<>();
- protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<>();
+ protected final Map<MessageId, StompAckEntry> dispatchedMessage = new LinkedHashMap<>();
+ protected final Map<String, StompAckEntry> pendingAcks; // STOMP v1.2 requires ACK ID tracking
+ protected final LinkedList<StompAckEntry> transactedMessages = new LinkedList<>();
protected String ackMode = AUTO_ACK;
protected ActiveMQDestination destination;
protected String transformation;
- public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
+ public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation, Map<String, StompAckEntry> pendingAcks) {
this.protocolConverter = stompTransport;
this.subscriptionId = subscriptionId;
this.consumerInfo = consumerInfo;
this.transformation = transformation;
+ this.pendingAcks = pendingAcks;
}
- void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException {
+ void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
- if (ackMode.equals(CLIENT_ACK) || ackMode.equals(INDIVIDUAL_ACK)) {
+
+ String ackId = null;
+ if (isClientAck() || isIndividualAck()) {
+ ackId = ACK_ID_GENERATOR.generateId();
+ StompAckEntry pendingAck = new StompAckEntry(md, ackId, this);
+
synchronized (this) {
- dispatchedMessage.put(message.getMessageId(), md);
+ dispatchedMessage.put(message.getMessageId(), pendingAck);
}
- } else if (ackMode.equals(AUTO_ACK)) {
+ if (protocolConverter.isStomp12()) {
+ this.pendingAcks.put(ackId, pendingAck);
+ }
+ } else if (isAutoAck()) {
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
protocolConverter.getStompTransport().sendToActiveMQ(ack);
}
@@ -93,35 +108,48 @@ public class StompSubscription {
command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
}
- if (ackId != null) {
+ if (protocolConverter.isStomp12() && ackId != null) {
command.getHeaders().put(Stomp.Headers.Message.ACK_ID, ackId);
}
- protocolConverter.getStompTransport().sendToStomp(command);
+ try {
+ protocolConverter.getStompTransport().sendToStomp(command);
+ } catch (IOException ex) {
+ if (ackId != null) {
+ pendingAcks.remove(ackId);
+ }
+ throw ex;
+ }
}
synchronized void onStompAbort(TransactionId transactionId) {
- unconsumedMessage.clear();
+ // Restore the pending ACKs so that their ACK IDs are again valid for a client
+ // to operate on.
+ LOG.trace("Transaction Abort restoring {} pending ACKs to valid state.", transactedMessages.size());
+ for (StompAckEntry ackEntry : transactedMessages) {
+ if (protocolConverter.isStomp12()) {
+ pendingAcks.put(ackEntry.getAckId(), ackEntry);
+ }
+ }
+ transactedMessages.clear();
}
void onStompCommit(TransactionId transactionId) {
MessageAck ack = null;
synchronized (this) {
- for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
- @SuppressWarnings("rawtypes")
- Map.Entry entry = (Entry)iter.next();
- MessageDispatch msg = (MessageDispatch)entry.getValue();
- if (unconsumedMessage.contains(msg)) {
- iter.remove();
+ for (Iterator<StompAckEntry> iterator = dispatchedMessage.values().iterator(); iterator.hasNext();) {
+ StompAckEntry ackEntry = iterator.next();
+ if (transactedMessages.contains(ackEntry)) {
+ iterator.remove();
}
}
// For individual Ack we already sent an Ack that will be applied on commit
// we don't send a second standard Ack as that would produce an error.
- if (!unconsumedMessage.isEmpty() && ackMode == CLIENT_ACK) {
- ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size());
+ if (!transactedMessages.isEmpty() && isClientAck()) {
+ ack = new MessageAck(transactedMessages.getLast().getMessageDispatch(), MessageAck.STANDARD_ACK_TYPE, transactedMessages.size());
ack.setTransactionId(transactionId);
- unconsumedMessage.clear();
+ transactedMessages.clear();
}
}
// avoid contention with onMessageDispatch
@@ -131,10 +159,10 @@ public class StompSubscription {
}
synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
-
MessageId msgId = new MessageId(messageId);
- if (!dispatchedMessage.containsKey(msgId)) {
+ final StompAckEntry ackEntry = dispatchedMessage.get(msgId);
+ if (ackEntry == null) {
return null;
}
@@ -142,35 +170,33 @@ public class StompSubscription {
ack.setDestination(consumerInfo.getDestination());
ack.setConsumerId(consumerInfo.getConsumerId());
- final ArrayList<String> acknowledgedMessages = new ArrayList<>();
-
- if (ackMode == CLIENT_ACK) {
+ if (isClientAck()) {
if (transactionId == null) {
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
} else {
ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
}
int count = 0;
- for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
+ for (Iterator<StompAckEntry> iterator = dispatchedMessage.values().iterator(); iterator.hasNext();) {
+ StompAckEntry entry = iterator.next();
+ MessageId current = entry.getMessageId();
- @SuppressWarnings("rawtypes")
- Map.Entry entry = (Entry)iter.next();
- MessageId id = (MessageId)entry.getKey();
- MessageDispatch msg = (MessageDispatch)entry.getValue();
+ if (entry.getAckId() != null) {
+ pendingAcks.remove(entry.getAckId());
+ }
if (transactionId != null) {
- if (!unconsumedMessage.contains(msg)) {
- unconsumedMessage.add(msg);
+ if (!transactedMessages.contains(entry)) {
+ transactedMessages.add(entry);
count++;
}
} else {
- acknowledgedMessages.add(id.toString());
- iter.remove();
+ iterator.remove();
count++;
}
- if (id.equals(msgId)) {
- ack.setLastMessageId(id);
+ if (current.equals(msgId)) {
+ ack.setLastMessageId(current);
break;
}
}
@@ -178,14 +204,15 @@ public class StompSubscription {
if (transactionId != null) {
ack.setTransactionId(transactionId);
}
-
- this.protocolConverter.afterClientAck(this, acknowledgedMessages);
- } else if (ackMode == INDIVIDUAL_ACK) {
+ } else if (isIndividualAck()) {
+ if (ackEntry.getAckId() != null) {
+ pendingAcks.remove(ackEntry.getAckId());
+ }
ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
ack.setMessageID(msgId);
ack.setMessageCount(1);
if (transactionId != null) {
- unconsumedMessage.add(dispatchedMessage.get(msgId));
+ transactedMessages.add(dispatchedMessage.get(msgId));
ack.setTransactionId(transactionId);
} else {
dispatchedMessage.remove(msgId);
@@ -196,23 +223,29 @@ public class StompSubscription {
}
public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException {
-
MessageId msgId = new MessageId(messageId);
if (!dispatchedMessage.containsKey(msgId)) {
return null;
}
+ final StompAckEntry ackEntry = dispatchedMessage.get(msgId);
+
+ if (ackEntry.getAckId() != null) {
+ pendingAcks.remove(ackEntry.getAckId());
+ }
+
MessageAck ack = new MessageAck();
ack.setDestination(consumerInfo.getDestination());
ack.setConsumerId(consumerInfo.getConsumerId());
ack.setAckType(MessageAck.POSION_ACK_TYPE);
ack.setMessageID(msgId);
if (transactionId != null) {
- unconsumedMessage.add(dispatchedMessage.get(msgId));
+ transactedMessages.add(ackEntry);
ack.setTransactionId(transactionId);
+ } else {
+ dispatchedMessage.remove(msgId);
}
- dispatchedMessage.remove(msgId);
return ack;
}
@@ -225,6 +258,18 @@ public class StompSubscription {
this.ackMode = ackMode;
}
+ public boolean isAutoAck() {
+ return ackMode.equals(AUTO_ACK);
+ }
+
+ public boolean isClientAck() {
+ return ackMode.equals(CLIENT_ACK);
+ }
+
+ public boolean isIndividualAck() {
+ return ackMode.equals(INDIVIDUAL_ACK);
+ }
+
public String getSubscriptionId() {
return subscriptionId;
}
diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java
index b7560c7..3944c50 100644
--- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java
+++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java
@@ -21,7 +21,9 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.Socket;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
@@ -40,7 +42,6 @@ public class Stomp12Test extends StompTestSupport {
@Override
public void setUp() throws Exception {
-
super.setUp();
stompConnect();
@@ -70,7 +71,6 @@ public class Stomp12Test extends StompTestSupport {
@Test(timeout = 60000)
public void testTelnetStyleSends() throws Exception {
-
stompConnection.setVersion(Stomp.V1_2);
String connect = "CONNECT\r\n" +
@@ -107,7 +107,6 @@ public class Stomp12Test extends StompTestSupport {
@Test(timeout = 60000)
public void testClientAckWithoutAckId() throws Exception {
-
stompConnection.setVersion(Stomp.V1_2);
String connect = "STOMP\r\n" +
@@ -150,18 +149,40 @@ public class Stomp12Test extends StompTestSupport {
assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
assertEquals("1", received.getBody());
- String frame = "ACK\n" + "message-id:" +
- received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+ String ackId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID);
+
+ // Put ACK ID in wrong header
+ String frame = "ACK\n" + "message-id:" + ackId + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
received = stompConnection.receive();
assertTrue(received.getAction().equals("ERROR"));
LOG.info("Broker sent: " + received);
+
+ // Now place it in the correct location and check it still works.
+ frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("2", receiptId);
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+ frame = "DISCONNECT\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
public void testClientAck() throws Exception {
-
stompConnection.setVersion(Stomp.V1_2);
String connect = "STOMP\r\n" +
@@ -255,11 +276,106 @@ public class Stomp12Test extends StompTestSupport {
frame = "ACK\n" + "id:" +
received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+ frame = "DISCONNECT\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
- public void testClientIndividualAck() throws Exception {
+ public void testClientAckMultipleMessagesWithSingleAck() throws Exception {
+ final int MESSAGE_COUNT = 10;
+
+ stompConnection.setVersion(Stomp.V1_2);
+
+ String connect = "STOMP\r\n" +
+ "accept-version:1.2\r\n" +
+ "login:system\r\n" +
+ "passcode:manager\r\n" +
+ "\r\n" +
+ "\u0000\r\n";
+
+ stompConnection.sendFrame(connect);
+
+ String f = stompConnection.receiveFrame();
+ LOG.info("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+ assertTrue(f.indexOf("version:1.2") >= 0);
+ assertTrue(f.indexOf("session:") >= 0);
+ // Send some messages
+ for (int n = 0; n < MESSAGE_COUNT; n++) {
+ String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL;
+ stompConnection.sendFrame(message);
+ }
+
+ String subscribe = "SUBSCRIBE\n" +
+ "id:1\n" +
+ "ack:client\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "receipt:1\n" +
+ "\n" + Stomp.NULL;
+
+ stompConnection.sendFrame(subscribe);
+
+ StompFrame receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ String receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("1", receiptId);
+ assertEquals(MESSAGE_COUNT, getProxyToQueue(getQueueName()).getQueueSize());
+
+ String lastAckId = null;
+
+ for (int n = 0; n < MESSAGE_COUNT; n++) {
+ StompFrame received = stompConnection.receive();
+ LOG.info("Broker sent: " + received);
+ assertTrue(received.getAction().equals("MESSAGE"));
+ assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+ assertEquals(String.format("%d", n), received.getBody());
+
+ lastAckId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID);
+ }
+
+ String frame = "ACK\n" + "id:" + lastAckId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("2", receiptId);
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+ frame = "DISCONNECT\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToBroker().getCurrentConnectionsCount() == 1;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+ }
+
+ @Test(timeout = 60000)
+ public void testClientIndividualAck() throws Exception {
stompConnection.setVersion(Stomp.V1_2);
String connect = "STOMP\r\n" +
@@ -345,24 +461,117 @@ public class Stomp12Test extends StompTestSupport {
assertTrue(received.getAction().equals("MESSAGE"));
assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
assertEquals("1", received.getBody());
+ String message1AckId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID);
- frame = "ACK\n" + "id:" +
- received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+ frame = "ACK\n" + "id:" + message1AckId + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
received = stompConnection.receive();
assertTrue(received.getAction().equals("MESSAGE"));
assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
assertEquals("3", received.getBody());
+ String message3AckId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID);
- frame = "ACK\n" + "id:" +
- received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+ frame = "ACK\n" + "id:" + message3AckId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("2", receiptId);
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+ frame = "DISCONNECT\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
- public void testQueueBrowerSubscription() throws Exception {
+ public void testRepeatedClientIndividualAckInMultipleTransactions() throws Exception {
+ final int MESSAGE_COUNT = 50;
+
+ stompConnection.setVersion(Stomp.V1_2);
+
+ String connect = "STOMP\r\n" +
+ "accept-version:1.2\r\n" +
+ "login:system\r\n" +
+ "passcode:manager\r\n" +
+ "\r\n" +
+ "\u0000\r\n";
+
+ stompConnection.sendFrame(connect);
+
+ String f = stompConnection.receiveFrame();
+ LOG.info("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+ assertTrue(f.indexOf("version:1.2") >= 0);
+ assertTrue(f.indexOf("session:") >= 0);
+
+ // Send some messages
+ for (int n = 0; n < MESSAGE_COUNT; n++) {
+ String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL;
+ stompConnection.sendFrame(message);
+ }
+ // Subscribe to the queue
+ String subscribe = "SUBSCRIBE\n" +
+ "id:1\n" +
+ "activemq.prefetchSize:1\n" +
+ "ack:client-individual\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "receipt:1\n" +
+ "\n" + Stomp.NULL;
+
+ stompConnection.sendFrame(subscribe);
+
+ StompFrame receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ String receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("1", receiptId);
+
+ // Receive all messages, each in their own transaction
+ // Ensure we don't have any errors
+ for (int n = 0; n < MESSAGE_COUNT; n++) {
+ StompFrame received = stompConnection.receive();
+ LOG.info("Broker sent: " + received);
+ assertTrue(received.getAction().equals("MESSAGE"));
+ assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+ assertEquals(String.format("%d", n), received.getBody());
+
+ // Ack & Commit the first message
+ String begin = "BEGIN\n" + "transaction:tx" + String.format("%d", n) + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(begin);
+
+ String frame = "ACK\n" + "transaction:tx" + String.format("%d", n) + "\n" + "id:" +
+ received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ String commit = "COMMIT\n" + "transaction:tx" + String.format("%d", n) + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(commit);
+ }
+
+ String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToBroker().getCurrentConnectionsCount() <= 1;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
+ }
+
+ @Test(timeout = 60000)
+ public void testQueueBrowerSubscription() throws Exception {
final int MSG_COUNT = 10;
String connectFrame = "STOMP\n" +
@@ -523,7 +732,6 @@ public class Stomp12Test extends StompTestSupport {
@Test(timeout = 60000)
public void testSubscribeWithNoId() throws Exception {
-
String connectFrame = "STOMP\n" +
"login:system\n" +
"passcode:manager\n" +
@@ -571,7 +779,7 @@ public class Stomp12Test extends StompTestSupport {
long usageStart = brokerService.getSystemUsage().getMemoryUsage().getUsage();
- for(int i = 0; i < MSG_COUNT; ++i) {
+ for (int i = 0; i < MSG_COUNT; ++i) {
String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" +
"receipt:0\n" +
"myXkProp:" + bigProp + "\n"+
@@ -593,11 +801,372 @@ public class Stomp12Test extends StompTestSupport {
"id:12345\n" + "browser:true\n\n" + Stomp.NULL;
stompConnection.sendFrame(subscribe);
- for(int i = 0; i < MSG_COUNT; ++i) {
+ for (int i = 0; i < MSG_COUNT; ++i) {
StompFrame message = stompConnection.receive();
assertEquals(Stomp.Responses.MESSAGE, message.getAction());
assertEquals("12345", message.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION));
}
+ }
+
+ @Test(timeout = 60000)
+ public void testAckMessagesAfterTransactionAbortClientIndividualAckMode() throws Exception {
+ doTestMessagesRetirementAfterTransactionAbortClientIndividualAckMode(false);
+ }
+
+ @Test(timeout = 60000)
+ public void testNackMessagesAfterTransactionAbortClientIndividualAckMode() throws Exception {
+ doTestMessagesRetirementAfterTransactionAbortClientIndividualAckMode(true);
+ }
+
+ private void doTestMessagesRetirementAfterTransactionAbortClientIndividualAckMode(boolean nack) throws Exception {
+ final int MESSAGE_COUNT = 10;
+
+ stompConnection.setVersion(Stomp.V1_2);
+
+ String connect = "STOMP\r\n" +
+ "accept-version:1.2\r\n" +
+ "login:system\r\n" +
+ "passcode:manager\r\n" +
+ "\r\n" +
+ "\u0000\r\n";
+
+ stompConnection.sendFrame(connect);
+
+ String f = stompConnection.receiveFrame();
+ LOG.info("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+ assertTrue(f.indexOf("version:1.2") >= 0);
+ assertTrue(f.indexOf("session:") >= 0);
+
+ // Send some messages
+ for (int n = 0; n < MESSAGE_COUNT; n++) {
+ String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL;
+ stompConnection.sendFrame(message);
+ }
+
+ // Subscribe to the queue
+ String subscribe = "SUBSCRIBE\n" +
+ "id:1\n" +
+ "activemq.prefetchSize:1\n" +
+ "ack:client-individual\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "receipt:1\n" +
+ "\n" + Stomp.NULL;
+
+ stompConnection.sendFrame(subscribe);
+
+ StompFrame receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ String receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("1", receiptId);
+
+ // Start a TX that will later be aborted.
+ String begin = "BEGIN\n" + "transaction:tx1" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(begin);
+
+ List<String> ackIds = new ArrayList<>(MESSAGE_COUNT);
+
+ for (int n = 0; n < MESSAGE_COUNT; n++) {
+ StompFrame received = stompConnection.receive();
+ LOG.info("Broker sent: " + received);
+ assertTrue(received.getAction().equals("MESSAGE"));
+ assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+ assertEquals(String.format("%d", n), received.getBody());
+
+ ackIds.add(received.getHeaders().get(Stomp.Headers.Message.ACK_ID));
+
+ String frame = "ACK\n" + "transaction:tx1" + "\n" + "id:" +
+ received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ String commit = "ABORT\n" + "transaction:tx1" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(commit);
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToQueue(getQueueName()).getQueueSize() == MESSAGE_COUNT;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+ for (String ackId : ackIds) {
+ if (nack) {
+ String frame = "NACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ } else {
+ String frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("2", receiptId);
+ }
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+ String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToBroker().getCurrentConnectionsCount() <= 1;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
+ }
+
+ @Test(timeout = 60000)
+ public void testAckMessagesAfterTransactionAbortClientAckMode() throws Exception {
+ doTestMessagesRetirementAfterTransactionAbortClientAckMode(false);
+ }
+
+ @Test(timeout = 60000)
+ public void testNackMessagesAfterTransactionAbortClientAckMode() throws Exception {
+ doTestMessagesRetirementAfterTransactionAbortClientAckMode(true);
+ }
+
+ private void doTestMessagesRetirementAfterTransactionAbortClientAckMode(boolean nack) throws Exception {
+ final int MESSAGE_COUNT = 10;
+
+ stompConnection.setVersion(Stomp.V1_2);
+
+ String connect = "STOMP\r\n" +
+ "accept-version:1.2\r\n" +
+ "login:system\r\n" +
+ "passcode:manager\r\n" +
+ "\r\n" +
+ "\u0000\r\n";
+
+ stompConnection.sendFrame(connect);
+
+ String f = stompConnection.receiveFrame();
+ LOG.info("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+ assertTrue(f.indexOf("version:1.2") >= 0);
+ assertTrue(f.indexOf("session:") >= 0);
+
+ // Send some messages
+ for (int n = 0; n < MESSAGE_COUNT; n++) {
+ String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL;
+ stompConnection.sendFrame(message);
+ }
+
+ // Subscribe to the queue
+ String subscribe = "SUBSCRIBE\n" +
+ "id:1\n" +
+ "activemq.prefetchSize:" + MESSAGE_COUNT + "\n" +
+ "ack:client\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "receipt:1\n" +
+ "\n" + Stomp.NULL;
+
+ stompConnection.sendFrame(subscribe);
+
+ StompFrame receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ String receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("1", receiptId);
+
+ // Start a TX that will later be aborted.
+ String begin = "BEGIN\n" + "transaction:tx1" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(begin);
+
+ List<String> ackIds = new ArrayList<>(MESSAGE_COUNT);
+
+ for (int n = 0; n < MESSAGE_COUNT; n++) {
+ StompFrame received = stompConnection.receive();
+ LOG.info("Broker sent: " + received);
+ assertTrue(received.getAction().equals("MESSAGE"));
+ assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+ assertEquals(String.format("%d", n), received.getBody());
+
+ ackIds.add(received.getHeaders().get(Stomp.Headers.Message.ACK_ID));
+ }
+
+ // Client ACK that enlists all messages in the TX
+ String frame = "ACK\n" + "transaction:tx1" + "\n" + "id:" + ackIds.get(MESSAGE_COUNT - 1) + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToQueue(getQueueName()).getQueueSize() == MESSAGE_COUNT;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+ String commit = "ABORT\n" + "transaction:tx1" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(commit);
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToQueue(getQueueName()).getQueueSize() == MESSAGE_COUNT;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+ for (String ackId : ackIds) {
+ if (nack) {
+ frame = "NACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ } else {
+ frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("2", receiptId);
+ }
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToBroker().getCurrentConnectionsCount() <= 1;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
+ }
+
+ @Test(timeout = 60000)
+ public void testMixedAckNackWithMessageAckIdsClientAck() throws Exception {
+ doTestMixedAckNackWithMessageAckIds(false);
+ }
+
+ @Test(timeout = 60000)
+ public void testMixedAckNackWithMessageAckIdsClientIndividualAck() throws Exception {
+ doTestMixedAckNackWithMessageAckIds(true);
+ }
+
+ public void doTestMixedAckNackWithMessageAckIds(boolean individual) throws Exception {
+
+ final int MESSAGE_COUNT = 20;
+
+ stompConnection.setVersion(Stomp.V1_2);
+
+ String connect = "STOMP\r\n" +
+ "accept-version:1.2\r\n" +
+ "login:system\r\n" +
+ "passcode:manager\r\n" +
+ "\r\n" +
+ "\u0000\r\n";
+
+ stompConnection.sendFrame(connect);
+
+ String f = stompConnection.receiveFrame();
+ LOG.info("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+ assertTrue(f.indexOf("version:1.2") >= 0);
+ assertTrue(f.indexOf("session:") >= 0);
+
+ // Send some messages
+ for (int n = 0; n < MESSAGE_COUNT; n++) {
+ String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL;
+ stompConnection.sendFrame(message);
+ }
+
+ // Subscribe to the queue
+ String subscribe = "SUBSCRIBE\n" +
+ "id:1\n" +
+ "activemq.prefetchSize:" + MESSAGE_COUNT + "\n" +
+ "ack:" + (individual ? "client-individual" : "client") + "\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "receipt:1\n" +
+ "\n" + Stomp.NULL;
+
+ stompConnection.sendFrame(subscribe);
+
+ StompFrame receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ String receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("1", receiptId);
+
+ List<String> ackIds = new ArrayList<>(MESSAGE_COUNT);
+
+ for (int n = 0; n < MESSAGE_COUNT; n++) {
+ StompFrame received = stompConnection.receive();
+ LOG.info("Broker sent: " + received);
+ assertTrue(received.getAction().equals("MESSAGE"));
+ assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+ assertEquals(String.format("%d", n), received.getBody());
+
+ ackIds.add(received.getHeaders().get(Stomp.Headers.Message.ACK_ID));
+ }
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToQueue(getQueueName()).getQueueSize() == MESSAGE_COUNT;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+ boolean nack = false;
+
+ for (String ackId : ackIds) {
+ if (nack) {
+ String frame = "NACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ nack = !nack;
+ } else {
+ String frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ nack = !nack;
+ }
+
+ receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("2", receiptId);
+ }
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+ String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToBroker().getCurrentConnectionsCount() <= 1;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
}
}