You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/02/22 22:32:55 UTC
[activemq-artemis] branch master updated: ARTEMIS-3093 Ordering on
multiple consumers and core with rollback
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 12c8096 ARTEMIS-3093 Ordering on multiple consumers and core with rollback
new 4de4329c This closes #3463
12c8096 is described below
commit 12c8096a23840ede9c364cc184dddfe19846e2e0
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Mon Feb 22 09:36:55 2021 -0500
ARTEMIS-3093 Ordering on multiple consumers and core with rollback
---
.../protocol/amqp/broker/AMQPSessionCallback.java | 4 +-
.../proton/transaction/ProtonTransactionImpl.java | 2 +-
.../core/protocol/mqtt/MQTTPublishManager.java | 2 +-
.../apache/activemq/artemis/core/server/Queue.java | 3 +-
.../artemis/core/server/ServerConsumer.java | 4 +-
.../core/server/cluster/impl/BridgeImpl.java | 2 +-
.../artemis/core/server/impl/LastValueQueue.java | 13 +++
.../artemis/core/server/impl/QueueImpl.java | 33 +++----
.../artemis/core/server/impl/RefsOperation.java | 7 +-
.../core/server/impl/ServerConsumerImpl.java | 13 +--
.../core/server/impl/ServerSessionImpl.java | 2 +-
.../core/transaction/TransactionOperation.java | 4 -
.../core/transaction/impl/TransactionImpl.java | 43 +++------
.../server/impl/ScheduledDeliveryHandlerTest.java | 2 +-
.../tests/integration/cli/DummyServerConsumer.java | 7 +-
.../tests/integration/client/JMSOrderTest.java | 104 +++++++++++++++++++++
.../tests/integration/server/RingQueueTest.java | 25 +++--
.../tests/unit/core/postoffice/impl/FakeQueue.java | 2 +-
18 files changed, 177 insertions(+), 95 deletions(-)
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index eb62855..f0fb1ef 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -398,7 +398,7 @@ public class AMQPSessionCallback implements SessionCallback {
public void closeSender(final Object brokerConsumer) throws Exception {
final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
- consumer.close(false, true);
+ consumer.close(false);
consumer.getQueue().recheckRefCount(serverSession.getSessionContext());
}
@@ -440,7 +440,7 @@ public class AMQPSessionCallback implements SessionCallback {
public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception {
OperationContext oldContext = recoverContext();
try {
- ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts, true);
+ ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);
((ServerConsumer) brokerConsumer).getQueue().forceDelivery();
} finally {
resetContext(oldContext);
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
index 83128e1..123dbb5 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
@@ -50,7 +50,7 @@ public class ProtonTransactionImpl extends TransactionImpl {
private boolean discharged;
public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final AMQPConnectionContext connection) {
- super(xid, storageManager, timeoutSeconds, true);
+ super(xid, storageManager, timeoutSeconds);
addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 4b89636..5d9c96d 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -133,7 +133,7 @@ public class MQTTPublishManager {
sendServerMessage(mqttid, message, deliveryCount, qos);
} else {
// Client must have disconnected and it's Subscription QoS cleared
- consumer.individualCancel(message.getMessageID(), false, true);
+ consumer.individualCancel(message.getMessageID(), false);
}
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 628e43e..5b1d128 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -220,8 +220,7 @@ public interface Queue extends Bindable,CriticalComponent {
void cancel(Transaction tx, MessageReference ref, boolean ignoreRedeliveryCheck);
- /** @param sorted it should use the messageID as a reference to where to add it in the queue */
- void cancel(MessageReference reference, long timeBase, boolean sorted) throws Exception;
+ void cancel(MessageReference reference, long timeBase) throws Exception;
void deliverAsync();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
index 8528f68..e743c04 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
@@ -64,8 +64,6 @@ public interface ServerConsumer extends Consumer, ConsumerInfo {
void close(boolean failed) throws Exception;
- void close(boolean failed, boolean sorted) throws Exception;
-
/**
* This method is just to remove itself from Queues.
* If for any reason during a close an exception occurred, the exception treatment
@@ -101,7 +99,7 @@ public interface ServerConsumer extends Consumer, ConsumerInfo {
void reject(long messageID) throws Exception;
- void individualCancel(long messageID, boolean failed, boolean sorted) throws Exception;
+ void individualCancel(long messageID, boolean failed) throws Exception;
void forceDelivery(long sequence);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 8a34ca3..4458ac9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -355,7 +355,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
refqueue = ref.getQueue();
try {
- refqueue.cancel(ref, timeBase, false);
+ refqueue.cancel(ref, timeBase);
} catch (Exception e) {
// There isn't much we can do besides log an error
ActiveMQServerLogger.LOGGER.errorCancellingRefOnBridge(e, ref);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 1df1bce..02031e7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server.impl;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -193,6 +194,18 @@ public class LastValueQueue extends QueueImpl {
}
}
+ /** LVQ has to use regular addHead due to last value queues calculations */
+ @Override
+ public void addSorted(MessageReference ref, boolean scheduling) {
+ this.addHead(ref, scheduling);
+ }
+
+ /** LVQ has to use regular addHead due to last value queues calculations */
+ @Override
+ public void addSorted(List<MessageReference> refs, boolean scheduling) {
+ this.addHead(refs, scheduling);
+ }
+
@Override
public synchronized void addHead(final MessageReference ref, boolean scheduling) {
// we first need to check redelivery-delay, as we can't put anything on headers if redelivery-delay
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 44e1a3f..809c00c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1115,13 +1115,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
enterCritical(CRITICAL_PATH_ADD_HEAD);
synchronized (this) {
try {
- if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
- return;
+ if (ringSize != -1) {
+ enforceRing(ref, false, true);
}
- internalAddSorted(ref);
+ if (!ref.isAlreadyAcked()) {
+ if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
+ return;
+ }
+ internalAddSorted(ref);
- directDeliver = false;
+ directDeliver = false;
+ }
} finally {
leaveCritical(CRITICAL_PATH_ADD_HEAD);
}
@@ -1948,15 +1953,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
@Override
- public synchronized void cancel(final MessageReference reference, final long timeBase, boolean sorted) throws Exception {
+ public synchronized void cancel(final MessageReference reference, final long timeBase) throws Exception {
Pair<Boolean, Boolean> redeliveryResult = checkRedelivery(reference, timeBase, false);
if (redeliveryResult.getA()) {
if (!scheduledDeliveryHandler.checkAndSchedule(reference, false)) {
- if (sorted) {
- internalAddSorted(reference);
- } else {
- internalAddHead(reference);
- }
+ internalAddSorted(reference);
}
resetAllIterators();
@@ -2862,6 +2863,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
int priority = getPriority(ref);
messageReferences.addSorted(ref, priority);
+
+ ref.setInDelivery(false);
}
private int getPriority(MessageReference ref) {
@@ -3933,10 +3936,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
void postRollback(final LinkedList<MessageReference> refs) {
- postRollback(refs, false);
- }
-
- void postRollback(final LinkedList<MessageReference> refs, boolean sorted) {
//if we have purged then ignore adding the messages back
if (purgeOnNoConsumers && getConsumerCount() == 0) {
purgeAfterRollback(refs);
@@ -3946,11 +3945,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// if the queue is non-destructive then any ack is ignored so no need to add messages back onto the queue
if (!isNonDestructive()) {
- if (sorted) {
- addSorted(refs, false);
- } else {
- addHead(refs, false);
- }
+ addSorted(refs, false);
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index 054ba73..c50a06d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -84,11 +84,6 @@ public class RefsOperation extends TransactionOperationAbstract {
@Override
public void afterRollback(final Transaction tx) {
- afterRollback(tx, false);
- }
-
- @Override
- public void afterRollback(final Transaction tx, boolean sorted) {
Map<QueueImpl, LinkedList<MessageReference>> queueMap = new HashMap<>();
long timeBase = System.currentTimeMillis();
@@ -121,7 +116,7 @@ public class RefsOperation extends TransactionOperationAbstract {
QueueImpl queue = entry.getKey();
synchronized (queue) {
- queue.postRollback(refs, sorted);
+ queue.postRollback(refs);
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 15864db..f7dbf9a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -539,12 +539,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
@Override
- public void close(final boolean failed) throws Exception {
- close(failed, false);
- }
-
- @Override
- public synchronized void close(final boolean failed, boolean sorted) throws Exception {
+ public synchronized void close(final boolean failed) throws Exception {
// Close should only ever be done once per consumer.
if (isClosed) return;
@@ -570,7 +565,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
List<MessageReference> refs = cancelRefs(failed, false, null);
- Transaction tx = new TransactionImpl(storageManager, sorted);
+ Transaction tx = new TransactionImpl(storageManager);
refs.forEach(ref -> {
if (logger.isTraceEnabled()) {
@@ -1022,7 +1017,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
@Override
- public synchronized void individualCancel(final long messageID, boolean failed, boolean sorted) throws Exception {
+ public synchronized void individualCancel(final long messageID, boolean failed) throws Exception {
if (browseOnly) {
return;
}
@@ -1037,7 +1032,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
ref.decrementDeliveryCount();
}
- ref.getQueue().cancel(ref, System.currentTimeMillis(), sorted);
+ ref.getQueue().cancel(ref, System.currentTimeMillis());
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index f66a57f..dbc68f2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1268,7 +1268,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
ServerConsumer consumer = locateConsumer(consumerID);
if (consumer != null) {
- consumer.individualCancel(messageID, failed, false);
+ consumer.individualCancel(messageID, failed);
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java
index 5c7e7e6..5da1d97 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java
@@ -52,10 +52,6 @@ public interface TransactionOperation {
*/
void afterRollback(Transaction tx);
- default void afterRollback(Transaction tx, boolean sorted) {
- afterRollback(tx);
- }
-
List<MessageReference> getRelatedMessageReferences();
List<MessageReference> getListOnConsumer(long consumerID);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
index 8e22bbb..e14d31d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
@@ -63,8 +63,6 @@ public class TransactionImpl implements Transaction {
private final long createTime;
- private final boolean sorted;
-
private volatile boolean containsPersistent;
private int timeoutSeconds = -1;
@@ -98,34 +96,23 @@ public class TransactionImpl implements Transaction {
}
public TransactionImpl(final StorageManager storageManager, final int timeoutSeconds) {
- this(storageManager.generateID(), null, storageManager, timeoutSeconds, false);
+ this(storageManager.generateID(), null, storageManager, timeoutSeconds);
}
public TransactionImpl(final StorageManager storageManager) {
- this(storageManager, false);
+ this(storageManager.generateID(), null, storageManager,-1);
}
- public TransactionImpl(final StorageManager storageManager, boolean sorted) {
- this(storageManager.generateID(), null, storageManager,-1, sorted);
- }
public TransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds) {
- this(storageManager.generateID(), xid, storageManager, timeoutSeconds, false);
- }
-
- public TransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final boolean sorted) {
- this(storageManager.generateID(), xid, storageManager, timeoutSeconds, sorted);
+ this(storageManager.generateID(), xid, storageManager, timeoutSeconds);
}
public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager) {
- this(id, xid, storageManager, -1, false);
+ this(id, xid, storageManager, -1);
}
- public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, boolean sorted) {
- this(id, xid, storageManager, -1, sorted);
- }
-
- private TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, final int timeoutSeconds, boolean sorted) {
+ private TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, final int timeoutSeconds) {
this.storageManager = storageManager;
this.xid = xid;
@@ -135,8 +122,6 @@ public class TransactionImpl implements Transaction {
this.createTime = System.currentTimeMillis();
this.timeoutSeconds = timeoutSeconds;
-
- this.sorted = sorted;
}
// Transaction implementation
@@ -217,7 +202,7 @@ public class TransactionImpl implements Transaction {
logger.trace("TransactionImpl::prepare::rollbackonly, rollingback " + this);
}
- internalRollback(sorted);
+ internalRollback();
if (exception != null) {
throw exception;
@@ -276,7 +261,7 @@ public class TransactionImpl implements Transaction {
return;
}
if (state == State.ROLLBACK_ONLY) {
- internalRollback(sorted);
+ internalRollback();
if (exception != null) {
throw exception;
@@ -367,7 +352,7 @@ public class TransactionImpl implements Transaction {
}
if (state != State.PREPARED) {
try {
- internalRollback(sorted);
+ internalRollback();
} catch (Exception e) {
// nothing we can do beyond logging
// no need to special handler here as this was not even supposed to happen at this point
@@ -400,11 +385,11 @@ public class TransactionImpl implements Transaction {
}
}
- internalRollback(sorted);
+ internalRollback();
}
}
- private void internalRollback(boolean sorted) throws Exception {
+ private void internalRollback() throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("TransactionImpl::internalRollback " + this);
}
@@ -439,7 +424,7 @@ public class TransactionImpl implements Transaction {
@Override
public void done() {
- afterRollback(operationsToComplete, sorted);
+ afterRollback(operationsToComplete);
}
});
@@ -453,7 +438,7 @@ public class TransactionImpl implements Transaction {
@Override
public void done() {
- afterRollback(storeOperationsToComplete, sorted);
+ afterRollback(storeOperationsToComplete);
}
});
}
@@ -583,10 +568,10 @@ public class TransactionImpl implements Transaction {
}
}
- private synchronized void afterRollback(List<TransactionOperation> operationsToComplete, boolean sorted) {
+ private synchronized void afterRollback(List<TransactionOperation> operationsToComplete) {
if (operationsToComplete != null) {
for (TransactionOperation operation : operationsToComplete) {
- operation.afterRollback(this, sorted);
+ operation.afterRollback(this);
}
// Help out GC here
operationsToComplete.clear();
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 5d3e1d1..0fcd2b4 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -1217,7 +1217,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
- public void cancel(MessageReference reference, long timeBase, boolean backInPlace) throws Exception {
+ public void cancel(MessageReference reference, long timeBase) throws Exception {
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
index a822423..1693036 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
@@ -92,11 +92,6 @@ public class DummyServerConsumer implements ServerConsumer {
}
@Override
- public void close(boolean failed, boolean sorted) throws Exception {
-
- }
-
- @Override
public void removeItself() throws Exception {
}
@@ -156,7 +151,7 @@ public class DummyServerConsumer implements ServerConsumer {
}
@Override
- public void individualCancel(long messageID, boolean failed, boolean sorted) throws Exception {
+ public void individualCancel(long messageID, boolean failed) throws Exception {
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
index e883bf6..333bcda 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
@@ -30,8 +30,15 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -177,4 +184,101 @@ public class JMSOrderTest extends JMSTestBase {
}
+ @Test
+ public void testMultipleConsumersRollback() throws Exception {
+ internalMultipleConsumers(true);
+ }
+
+ @Test
+ public void testMultipleConsumersClose() throws Exception {
+ internalMultipleConsumers(false);
+ }
+
+ private void internalMultipleConsumers(final boolean rollback) throws Exception {
+
+
+ org.apache.activemq.artemis.core.server.Queue serverQueue = server.createQueue(new QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(false));
+
+ int numberOfMessages = 100;
+ int numberOfConsumers = 3;
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+ final javax.jms.Queue jmsQueue;
+
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ jmsQueue = session.createQueue(getName());
+ MessageProducer producer = session.createProducer(jmsQueue);
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ TextMessage message = session.createTextMessage("test " + i);
+ message.setIntProperty("i", i);
+ producer.send(message);
+ }
+ }
+
+ Wait.assertEquals(numberOfMessages, serverQueue::getMessageCount);
+
+ AtomicBoolean running = new AtomicBoolean(true);
+ AtomicInteger errors = new AtomicInteger(0);
+ Runnable r = () -> {
+ try (Connection c = factory.createConnection()) {
+ Session s = c.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer cs = s.createConsumer(jmsQueue);
+ c.start();
+ int rollbacks = 0;
+ while (running.get()) {
+ TextMessage txt = (TextMessage)cs.receive(500);
+ if (txt != null) {
+ if (rollback) {
+ s.rollback();
+ rollbacks++;
+
+ if (rollbacks >= 3) {
+ break;
+ }
+ }
+ } else {
+ return;
+ }
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ running.set(false);
+ }
+ };
+
+ Thread[] threads = new Thread[numberOfConsumers];
+
+ for (int i = 0; i < numberOfConsumers; i++) {
+ threads[i] = new Thread(r, "consumer " + i);
+ threads[i].start();
+ }
+
+ for (Thread t : threads) {
+ t.join();
+ }
+
+ Assert.assertEquals(0, errors.get());
+
+ Wait.assertEquals(numberOfMessages, serverQueue::getMessageCount);
+
+ try (Connection c = factory.createConnection()) {
+ Session s = c.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer cs = s.createConsumer(jmsQueue);
+ c.start();
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ TextMessage message = (TextMessage) cs.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals(i, message.getIntProperty("i"));
+ }
+
+ Assert.assertNull(cs.receiveNoWait());
+ }
+
+ }
+
+
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java
index 3cfaf98..8f7ae9f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java
@@ -149,12 +149,12 @@ public class RingQueueTest extends ActiveMQTestBase {
producer.send(message);
message = createTextMessage(clientSession, "hello1");
producer.send(message);
- Wait.assertTrue(() -> queue.getMessageCount() == 2);
- Wait.assertTrue(() -> queue.getDeliveringCount() == 2);
+ Wait.assertEquals(2, queue::getMessageCount);
+ Wait.assertEquals(2, queue::getDeliveringCount);
consumer.close();
- Wait.assertTrue(() -> queue.getMessageCount() == 1);
- Wait.assertTrue(() -> queue.getDeliveringCount() == 0);
- Wait.assertTrue(() -> queue.getMessagesReplaced() == 1);
+ Wait.assertEquals(1, queue::getMessageCount);
+ Wait.assertEquals(0, queue::getDeliveringCount);
+ Wait.assertEquals(1, queue::getMessagesReplaced);
consumer = clientSession.createConsumer(qName);
message = consumer.receiveImmediate();
assertNotNull(message);
@@ -242,13 +242,20 @@ public class RingQueueTest extends ActiveMQTestBase {
message.acknowledge();
}
consumer.close();
- Wait.assertTrue(() -> queue.getMessageCount() == 5);
+ Wait.assertEquals(5, queue::getMessageCount);
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 5; i++) {
producer.send(clientSession.createMessage(true));
}
- Wait.assertTrue(() -> queue.getMessageCount() == 10);
- Wait.assertTrue(() -> queue.getMessagesReplaced() == 5);
+ Wait.assertEquals(10, queue::getMessageCount);
+
+ // these sends will be replacing the old values
+ for (int i = 0; i < 5; i++) {
+ producer.send(clientSession.createMessage(true));
+ Wait.assertEquals(10, queue::getMessageCount);
+ }
+
+ Wait.assertEquals(5, queue::getMessagesReplaced);
consumer = clientSession.createConsumer(qName);
message = consumer.receiveImmediate();
assertNotNull(message);
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index cc88b53..cb2aff6 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -423,7 +423,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
- public void cancel(final MessageReference reference, final long timeBase, boolean sorted) throws Exception {
+ public void cancel(final MessageReference reference, final long timeBase) throws Exception {
// no-op
}