You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/02/22 22:32:55 UTC

[activemq-artemis] branch master updated: ARTEMIS-3093 Ordering on multiple consumers and core with rollback

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 12c8096  ARTEMIS-3093 Ordering on multiple consumers and core with rollback
     new 4de4329c This closes #3463
12c8096 is described below

commit 12c8096a23840ede9c364cc184dddfe19846e2e0
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Mon Feb 22 09:36:55 2021 -0500

    ARTEMIS-3093 Ordering on multiple consumers and core with rollback
---
 .../protocol/amqp/broker/AMQPSessionCallback.java  |   4 +-
 .../proton/transaction/ProtonTransactionImpl.java  |   2 +-
 .../core/protocol/mqtt/MQTTPublishManager.java     |   2 +-
 .../apache/activemq/artemis/core/server/Queue.java |   3 +-
 .../artemis/core/server/ServerConsumer.java        |   4 +-
 .../core/server/cluster/impl/BridgeImpl.java       |   2 +-
 .../artemis/core/server/impl/LastValueQueue.java   |  13 +++
 .../artemis/core/server/impl/QueueImpl.java        |  33 +++----
 .../artemis/core/server/impl/RefsOperation.java    |   7 +-
 .../core/server/impl/ServerConsumerImpl.java       |  13 +--
 .../core/server/impl/ServerSessionImpl.java        |   2 +-
 .../core/transaction/TransactionOperation.java     |   4 -
 .../core/transaction/impl/TransactionImpl.java     |  43 +++------
 .../server/impl/ScheduledDeliveryHandlerTest.java  |   2 +-
 .../tests/integration/cli/DummyServerConsumer.java |   7 +-
 .../tests/integration/client/JMSOrderTest.java     | 104 +++++++++++++++++++++
 .../tests/integration/server/RingQueueTest.java    |  25 +++--
 .../tests/unit/core/postoffice/impl/FakeQueue.java |   2 +-
 18 files changed, 177 insertions(+), 95 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index eb62855..f0fb1ef 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -398,7 +398,7 @@ public class AMQPSessionCallback implements SessionCallback {
 
    public void closeSender(final Object brokerConsumer) throws Exception {
       final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
-      consumer.close(false, true);
+      consumer.close(false);
       consumer.getQueue().recheckRefCount(serverSession.getSessionContext());
    }
 
@@ -440,7 +440,7 @@ public class AMQPSessionCallback implements SessionCallback {
    public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception {
       OperationContext oldContext = recoverContext();
       try {
-         ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts, true);
+         ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);
          ((ServerConsumer) brokerConsumer).getQueue().forceDelivery();
       } finally {
          resetContext(oldContext);
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
index 83128e1..123dbb5 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
@@ -50,7 +50,7 @@ public class ProtonTransactionImpl extends TransactionImpl {
    private boolean discharged;
 
    public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final AMQPConnectionContext connection) {
-      super(xid, storageManager, timeoutSeconds, true);
+      super(xid, storageManager, timeoutSeconds);
       addOperation(new TransactionOperationAbstract() {
          @Override
          public void afterCommit(Transaction tx) {
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 4b89636..5d9c96d 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -133,7 +133,7 @@ public class MQTTPublishManager {
             sendServerMessage(mqttid, message, deliveryCount, qos);
          } else {
             // Client must have disconnected and it's Subscription QoS cleared
-            consumer.individualCancel(message.getMessageID(), false, true);
+            consumer.individualCancel(message.getMessageID(), false);
          }
       }
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 628e43e..5b1d128 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -220,8 +220,7 @@ public interface Queue extends Bindable,CriticalComponent {
 
    void cancel(Transaction tx, MessageReference ref, boolean ignoreRedeliveryCheck);
 
-   /** @param sorted it should use the messageID as a reference to where to add it in the queue */
-   void cancel(MessageReference reference, long timeBase, boolean sorted) throws Exception;
+   void cancel(MessageReference reference, long timeBase) throws Exception;
 
    void deliverAsync();
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
index 8528f68..e743c04 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
@@ -64,8 +64,6 @@ public interface ServerConsumer extends Consumer, ConsumerInfo {
 
    void close(boolean failed) throws Exception;
 
-   void close(boolean failed, boolean sorted) throws Exception;
-
    /**
     * This method is just to remove itself from Queues.
     * If for any reason during a close an exception occurred, the exception treatment
@@ -101,7 +99,7 @@ public interface ServerConsumer extends Consumer, ConsumerInfo {
 
    void reject(long messageID) throws Exception;
 
-   void individualCancel(long messageID, boolean failed, boolean sorted) throws Exception;
+   void individualCancel(long messageID, boolean failed) throws Exception;
 
    void forceDelivery(long sequence);
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 8a34ca3..4458ac9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -355,7 +355,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
          refqueue = ref.getQueue();
 
          try {
-            refqueue.cancel(ref, timeBase, false);
+            refqueue.cancel(ref, timeBase);
          } catch (Exception e) {
             // There isn't much we can do besides log an error
             ActiveMQServerLogger.LOGGER.errorCancellingRefOnBridge(e, ref);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 1df1bce..02031e7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.server.impl;
 
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -193,6 +194,18 @@ public class LastValueQueue extends QueueImpl {
       }
    }
 
+   /** LVQ has to use regular addHead due to last value queues calculations */
+   @Override
+   public void addSorted(MessageReference ref, boolean scheduling) {
+      this.addHead(ref, scheduling);
+   }
+
+   /** LVQ has to use regular addHead due to last value queues calculations */
+   @Override
+   public void addSorted(List<MessageReference> refs, boolean scheduling) {
+      this.addHead(refs, scheduling);
+   }
+
    @Override
    public synchronized void addHead(final MessageReference ref, boolean scheduling) {
       // we first need to check redelivery-delay, as we can't put anything on headers if redelivery-delay
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 44e1a3f..809c00c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1115,13 +1115,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       enterCritical(CRITICAL_PATH_ADD_HEAD);
       synchronized (this) {
          try {
-            if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
-               return;
+            if (ringSize != -1) {
+               enforceRing(ref, false, true);
             }
 
-            internalAddSorted(ref);
+            if (!ref.isAlreadyAcked()) {
+               if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
+                  return;
+               }
+               internalAddSorted(ref);
 
-            directDeliver = false;
+               directDeliver = false;
+            }
          } finally {
             leaveCritical(CRITICAL_PATH_ADD_HEAD);
          }
@@ -1948,15 +1953,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    }
 
    @Override
-   public synchronized void cancel(final MessageReference reference, final long timeBase, boolean sorted) throws Exception {
+   public synchronized void cancel(final MessageReference reference, final long timeBase) throws Exception {
       Pair<Boolean, Boolean> redeliveryResult = checkRedelivery(reference, timeBase, false);
       if (redeliveryResult.getA()) {
          if (!scheduledDeliveryHandler.checkAndSchedule(reference, false)) {
-            if (sorted) {
-               internalAddSorted(reference);
-            } else {
-               internalAddHead(reference);
-            }
+            internalAddSorted(reference);
          }
 
          resetAllIterators();
@@ -2862,6 +2863,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       int priority = getPriority(ref);
 
       messageReferences.addSorted(ref, priority);
+
+      ref.setInDelivery(false);
    }
 
    private int getPriority(MessageReference ref) {
@@ -3933,10 +3936,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    }
 
    void postRollback(final LinkedList<MessageReference> refs) {
-      postRollback(refs, false);
-   }
-
-   void postRollback(final LinkedList<MessageReference> refs, boolean sorted) {
       //if we have purged then ignore adding the messages back
       if (purgeOnNoConsumers && getConsumerCount() == 0) {
          purgeAfterRollback(refs);
@@ -3946,11 +3945,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
       // if the queue is non-destructive then any ack is ignored so no need to add messages back onto the queue
       if (!isNonDestructive()) {
-         if (sorted) {
-            addSorted(refs, false);
-         } else {
-            addHead(refs, false);
-         }
+         addSorted(refs, false);
       }
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index 054ba73..c50a06d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -84,11 +84,6 @@ public class RefsOperation extends TransactionOperationAbstract {
 
    @Override
    public void afterRollback(final Transaction tx) {
-      afterRollback(tx, false);
-   }
-
-   @Override
-   public void afterRollback(final Transaction tx, boolean sorted) {
       Map<QueueImpl, LinkedList<MessageReference>> queueMap = new HashMap<>();
 
       long timeBase = System.currentTimeMillis();
@@ -121,7 +116,7 @@ public class RefsOperation extends TransactionOperationAbstract {
          QueueImpl queue = entry.getKey();
 
          synchronized (queue) {
-            queue.postRollback(refs, sorted);
+            queue.postRollback(refs);
          }
       }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 15864db..f7dbf9a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -539,12 +539,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    }
 
    @Override
-   public void close(final boolean failed) throws Exception {
-      close(failed, false);
-   }
-
-   @Override
-   public synchronized void close(final boolean failed, boolean sorted) throws Exception {
+   public synchronized void close(final boolean failed) throws Exception {
 
       // Close should only ever be done once per consumer.
       if (isClosed) return;
@@ -570,7 +565,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
       List<MessageReference> refs = cancelRefs(failed, false, null);
 
-      Transaction tx = new TransactionImpl(storageManager, sorted);
+      Transaction tx = new TransactionImpl(storageManager);
 
       refs.forEach(ref -> {
          if (logger.isTraceEnabled()) {
@@ -1022,7 +1017,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    }
 
    @Override
-   public synchronized void individualCancel(final long messageID, boolean failed, boolean sorted) throws Exception {
+   public synchronized void individualCancel(final long messageID, boolean failed) throws Exception {
       if (browseOnly) {
          return;
       }
@@ -1037,7 +1032,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
          ref.decrementDeliveryCount();
       }
 
-      ref.getQueue().cancel(ref, System.currentTimeMillis(), sorted);
+      ref.getQueue().cancel(ref, System.currentTimeMillis());
    }
 
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index f66a57f..dbc68f2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1268,7 +1268,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       ServerConsumer consumer = locateConsumer(consumerID);
 
       if (consumer != null) {
-         consumer.individualCancel(messageID, failed, false);
+         consumer.individualCancel(messageID, failed);
       }
 
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java
index 5c7e7e6..5da1d97 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java
@@ -52,10 +52,6 @@ public interface TransactionOperation {
     */
    void afterRollback(Transaction tx);
 
-   default void afterRollback(Transaction tx, boolean sorted) {
-      afterRollback(tx);
-   }
-
    List<MessageReference> getRelatedMessageReferences();
 
    List<MessageReference> getListOnConsumer(long consumerID);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
index 8e22bbb..e14d31d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
@@ -63,8 +63,6 @@ public class TransactionImpl implements Transaction {
 
    private final long createTime;
 
-   private final boolean sorted;
-
    private volatile boolean containsPersistent;
 
    private int timeoutSeconds = -1;
@@ -98,34 +96,23 @@ public class TransactionImpl implements Transaction {
    }
 
    public TransactionImpl(final StorageManager storageManager, final int timeoutSeconds) {
-      this(storageManager.generateID(), null, storageManager, timeoutSeconds, false);
+      this(storageManager.generateID(), null, storageManager, timeoutSeconds);
    }
 
    public TransactionImpl(final StorageManager storageManager) {
-      this(storageManager, false);
+      this(storageManager.generateID(), null, storageManager,-1);
    }
 
-   public TransactionImpl(final StorageManager storageManager, boolean sorted) {
-      this(storageManager.generateID(), null, storageManager,-1, sorted);
-   }
 
    public TransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds) {
-      this(storageManager.generateID(), xid, storageManager, timeoutSeconds, false);
-   }
-
-   public TransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final boolean sorted) {
-      this(storageManager.generateID(), xid, storageManager, timeoutSeconds, sorted);
+      this(storageManager.generateID(), xid, storageManager, timeoutSeconds);
    }
 
    public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager) {
-      this(id, xid, storageManager, -1, false);
+      this(id, xid, storageManager, -1);
    }
 
-   public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, boolean sorted) {
-      this(id, xid, storageManager, -1, sorted);
-   }
-
-   private TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, final int timeoutSeconds, boolean sorted) {
+   private TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, final int timeoutSeconds) {
       this.storageManager = storageManager;
 
       this.xid = xid;
@@ -135,8 +122,6 @@ public class TransactionImpl implements Transaction {
       this.createTime = System.currentTimeMillis();
 
       this.timeoutSeconds = timeoutSeconds;
-
-      this.sorted = sorted;
    }
 
    // Transaction implementation
@@ -217,7 +202,7 @@ public class TransactionImpl implements Transaction {
                   logger.trace("TransactionImpl::prepare::rollbackonly, rollingback " + this);
                }
 
-               internalRollback(sorted);
+               internalRollback();
 
                if (exception != null) {
                   throw exception;
@@ -276,7 +261,7 @@ public class TransactionImpl implements Transaction {
             return;
          }
          if (state == State.ROLLBACK_ONLY) {
-            internalRollback(sorted);
+            internalRollback();
 
             if (exception != null) {
                throw exception;
@@ -367,7 +352,7 @@ public class TransactionImpl implements Transaction {
          }
          if (state != State.PREPARED) {
             try {
-               internalRollback(sorted);
+               internalRollback();
             } catch (Exception e) {
                // nothing we can do beyond logging
                // no need to special handler here as this was not even supposed to happen at this point
@@ -400,11 +385,11 @@ public class TransactionImpl implements Transaction {
             }
          }
 
-         internalRollback(sorted);
+         internalRollback();
       }
    }
 
-   private void internalRollback(boolean sorted) throws Exception {
+   private void internalRollback() throws Exception {
       if (logger.isTraceEnabled()) {
          logger.trace("TransactionImpl::internalRollback " + this);
       }
@@ -439,7 +424,7 @@ public class TransactionImpl implements Transaction {
 
          @Override
          public void done() {
-            afterRollback(operationsToComplete, sorted);
+            afterRollback(operationsToComplete);
          }
       });
 
@@ -453,7 +438,7 @@ public class TransactionImpl implements Transaction {
 
             @Override
             public void done() {
-               afterRollback(storeOperationsToComplete, sorted);
+               afterRollback(storeOperationsToComplete);
             }
          });
       }
@@ -583,10 +568,10 @@ public class TransactionImpl implements Transaction {
       }
    }
 
-   private synchronized void afterRollback(List<TransactionOperation> operationsToComplete, boolean sorted) {
+   private synchronized void afterRollback(List<TransactionOperation> operationsToComplete) {
       if (operationsToComplete != null) {
          for (TransactionOperation operation : operationsToComplete) {
-            operation.afterRollback(this, sorted);
+            operation.afterRollback(this);
          }
          // Help out GC here
          operationsToComplete.clear();
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 5d3e1d1..0fcd2b4 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -1217,7 +1217,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public void cancel(MessageReference reference, long timeBase, boolean backInPlace) throws Exception {
+      public void cancel(MessageReference reference, long timeBase) throws Exception {
 
       }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
index a822423..1693036 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
@@ -92,11 +92,6 @@ public class DummyServerConsumer implements ServerConsumer {
    }
 
    @Override
-   public void close(boolean failed, boolean sorted) throws Exception {
-
-   }
-
-   @Override
    public void removeItself() throws Exception {
 
    }
@@ -156,7 +151,7 @@ public class DummyServerConsumer implements ServerConsumer {
    }
 
    @Override
-   public void individualCancel(long messageID, boolean failed, boolean sorted) throws Exception {
+   public void individualCancel(long messageID, boolean failed) throws Exception {
 
    }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
index e883bf6..333bcda 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
@@ -30,8 +30,15 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.tests.util.CFUtil;
 import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -177,4 +184,101 @@ public class JMSOrderTest extends JMSTestBase {
 
    }
 
+   @Test
+   public void testMultipleConsumersRollback() throws Exception {
+      internalMultipleConsumers(true);
+   }
+
+   @Test
+   public void testMultipleConsumersClose() throws Exception {
+      internalMultipleConsumers(false);
+   }
+
+   private void internalMultipleConsumers(final boolean rollback) throws Exception {
+
+
+      org.apache.activemq.artemis.core.server.Queue serverQueue = server.createQueue(new QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(false));
+
+      int numberOfMessages = 100;
+      int numberOfConsumers = 3;
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+      final javax.jms.Queue jmsQueue;
+
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         jmsQueue = session.createQueue(getName());
+         MessageProducer producer = session.createProducer(jmsQueue);
+
+         for (int i = 0; i < numberOfMessages; i++) {
+            TextMessage message = session.createTextMessage("test " + i);
+            message.setIntProperty("i", i);
+            producer.send(message);
+         }
+      }
+
+      Wait.assertEquals(numberOfMessages, serverQueue::getMessageCount);
+
+      AtomicBoolean running = new AtomicBoolean(true);
+      AtomicInteger errors = new AtomicInteger(0);
+      Runnable r = () -> {
+         try (Connection c = factory.createConnection()) {
+            Session s = c.createSession(true, Session.SESSION_TRANSACTED);
+            MessageConsumer cs = s.createConsumer(jmsQueue);
+            c.start();
+            int rollbacks = 0;
+            while (running.get()) {
+               TextMessage txt = (TextMessage)cs.receive(500);
+               if (txt != null) {
+                  if (rollback) {
+                     s.rollback();
+                     rollbacks++;
+
+                     if (rollbacks >= 3) {
+                        break;
+                     }
+                  }
+               } else {
+                  return;
+               }
+            }
+         } catch (Throwable e) {
+            e.printStackTrace();
+            errors.incrementAndGet();
+            running.set(false);
+         }
+      };
+
+      Thread[] threads = new Thread[numberOfConsumers];
+
+      for (int i = 0; i < numberOfConsumers; i++) {
+         threads[i] = new Thread(r, "consumer " + i);
+         threads[i].start();
+      }
+
+      for (Thread t : threads) {
+         t.join();
+      }
+
+      Assert.assertEquals(0, errors.get());
+
+      Wait.assertEquals(numberOfMessages, serverQueue::getMessageCount);
+
+      try (Connection c = factory.createConnection()) {
+         Session s = c.createSession(true, Session.SESSION_TRANSACTED);
+         MessageConsumer cs = s.createConsumer(jmsQueue);
+         c.start();
+
+         for (int i = 0; i < numberOfMessages; i++) {
+            TextMessage message = (TextMessage) cs.receive(1000);
+            Assert.assertNotNull(message);
+            Assert.assertEquals(i, message.getIntProperty("i"));
+         }
+
+         Assert.assertNull(cs.receiveNoWait());
+      }
+
+   }
+
+
 }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java
index 3cfaf98..8f7ae9f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java
@@ -149,12 +149,12 @@ public class RingQueueTest extends ActiveMQTestBase {
       producer.send(message);
       message = createTextMessage(clientSession, "hello1");
       producer.send(message);
-      Wait.assertTrue(() -> queue.getMessageCount() == 2);
-      Wait.assertTrue(() -> queue.getDeliveringCount() == 2);
+      Wait.assertEquals(2, queue::getMessageCount);
+      Wait.assertEquals(2, queue::getDeliveringCount);
       consumer.close();
-      Wait.assertTrue(() -> queue.getMessageCount() == 1);
-      Wait.assertTrue(() -> queue.getDeliveringCount() == 0);
-      Wait.assertTrue(() -> queue.getMessagesReplaced() == 1);
+      Wait.assertEquals(1, queue::getMessageCount);
+      Wait.assertEquals(0,  queue::getDeliveringCount);
+      Wait.assertEquals(1, queue::getMessagesReplaced);
       consumer = clientSession.createConsumer(qName);
       message = consumer.receiveImmediate();
       assertNotNull(message);
@@ -242,13 +242,20 @@ public class RingQueueTest extends ActiveMQTestBase {
          message.acknowledge();
       }
       consumer.close();
-      Wait.assertTrue(() -> queue.getMessageCount() == 5);
+      Wait.assertEquals(5, queue::getMessageCount);
 
-      for (int i = 0; i < 10; i++) {
+      for (int i = 0; i < 5; i++) {
          producer.send(clientSession.createMessage(true));
       }
-      Wait.assertTrue(() -> queue.getMessageCount() == 10);
-      Wait.assertTrue(() -> queue.getMessagesReplaced() == 5);
+      Wait.assertEquals(10, queue::getMessageCount);
+
+      // these sends will be replacing the old values
+      for (int i = 0; i < 5; i++) {
+         producer.send(clientSession.createMessage(true));
+         Wait.assertEquals(10, queue::getMessageCount);
+      }
+
+      Wait.assertEquals(5, queue::getMessagesReplaced);
       consumer = clientSession.createConsumer(qName);
       message = consumer.receiveImmediate();
       assertNotNull(message);
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index cc88b53..cb2aff6 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -423,7 +423,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
-   public void cancel(final MessageReference reference, final long timeBase, boolean sorted) throws Exception {
+   public void cancel(final MessageReference reference, final long timeBase) throws Exception {
       // no-op
 
    }