You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/08/21 14:09:29 UTC
[activemq-artemis] 01/03: ARTEMIS-2458 Fix AMQP Transaction
Rollback Ordering by using a sorted add
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 61eb379741ab3faebdffa4d7347507e8bd11d890
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Tue Aug 20 17:05:33 2019 -0400
ARTEMIS-2458 Fix AMQP Transaction Rollback Ordering by using a sorted add
---
.../artemis/utils/collections/LinkedListImpl.java | 62 ++++++++++
.../utils/collections/PriorityLinkedList.java | 2 +
.../utils/collections/PriorityLinkedListImpl.java | 17 ++-
.../protocol/amqp/broker/AMQPSessionCallback.java | 2 +-
.../core/protocol/mqtt/MQTTPublishManager.java | 2 +-
.../apache/activemq/artemis/core/server/Queue.java | 3 +-
.../artemis/core/server/ServerConsumer.java | 2 +-
.../core/server/cluster/impl/BridgeImpl.java | 2 +-
.../core/server/impl/MessageReferenceImpl.java | 23 ++++
.../artemis/core/server/impl/QueueImpl.java | 27 ++++-
.../core/server/impl/ServerConsumerImpl.java | 4 +-
.../core/server/impl/ServerSessionImpl.java | 2 +-
.../server/impl/ScheduledDeliveryHandlerTest.java | 2 +-
.../tests/integration/cli/DummyServerConsumer.java | 2 +-
.../tests/integration/client/JMSOrderTest.java | 132 +++++++++++++++++++++
.../tests/unit/core/postoffice/impl/FakeQueue.java | 2 +-
.../artemis/tests/unit/util/LinkedListTest.java | 73 +++++++++++-
17 files changed, 343 insertions(+), 16 deletions(-)
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
index cb20258..7426947 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.utils.collections;
import java.lang.reflect.Array;
+import java.util.Comparator;
import java.util.NoSuchElementException;
import java.util.Objects;
@@ -43,8 +44,15 @@ public class LinkedListImpl<E> implements LinkedList<E> {
private int nextIndex;
+ private final Comparator<E> comparator;
+
public LinkedListImpl() {
+ this(null);
+ }
+
+ public LinkedListImpl(Comparator<E> comparator) {
iters = createIteratorArray(INITIAL_ITERATOR_ARRAY_SIZE);
+ this.comparator = comparator;
}
@Override
@@ -84,6 +92,60 @@ public class LinkedListImpl<E> implements LinkedList<E> {
}
}
+ public void addSorted(E e) {
+ if (comparator == null) {
+ throw new NullPointerException("comparator=null");
+ }
+ if (size == 0) {
+ addHead(e);
+ } else {
+ if (comparator.compare(head.next.val(), e) < 0) {
+ addHead(e);
+ return;
+ }
+
+ // in our usage, most of the times we will just add to the end
+ // as the QueueImpl cancellations in AMQP will return the buffer back to the queue, in the order they were consumed.
+ // There is an exception to that case, when there are more messages on the queue.
+ // This would be an optimization for our usage.
+ // avoiding scanning the entire List just to add at the end, so we compare the end first.
+ if (comparator.compare(tail.val(), e) >= 0) {
+ addTail(e);
+ return;
+ }
+
+ Node<E> fetching = head.next;
+ while (fetching.next != null) {
+ int compareNext = comparator.compare(fetching.next.val(), e);
+ if (compareNext <= 0) {
+ addAfter(fetching, e);
+ return;
+ }
+ fetching = fetching.next;
+ }
+
+ // this shouldn't happen as the tail was compared before iterating
+ // the only possibilities for this to happen are:
+ // - there is a bug on the comparator
+ // - This method is buggy
+ // - The list wasn't properly synchronized as this list does't support concurrent access
+ //
+ // Also I'm not bothering about creating a Logger ID for this, because the only reason for this code to exist
+ // is because my OCD level is not letting this out.
+ throw new IllegalStateException("Cannot find a suitable place for your element, There's a mismatch in the comparator or there was concurrent adccess on the queue");
+ }
+ }
+
+ private void addAfter(Node<E> node, E e) {
+ Node<E> newNode = Node.with(e);
+ Node<E> nextNode = node.next;
+ node.next = newNode;
+ newNode.prev = node;
+ newNode.next = nextNode;
+ nextNode.prev = newNode;
+ size++;
+ }
+
@Override
public E poll() {
Node<E> ret = head.next;
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java
index 19e58c2..9437f55 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java
@@ -27,6 +27,8 @@ public interface PriorityLinkedList<T> {
void addTail(T t, int priority);
+ void addSorted(T t, int priority);
+
T poll();
void clear();
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java
index 00cf046..bde9461 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.utils.collections;
import java.lang.reflect.Array;
+import java.util.Comparator;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -40,10 +41,15 @@ public class PriorityLinkedListImpl<T> implements PriorityLinkedList<T> {
private int lastPriority = -1;
public PriorityLinkedListImpl(final int priorities) {
+ this(priorities, null);
+ }
+
+
+ public PriorityLinkedListImpl(final int priorities, Comparator<T> comparator) {
levels = (LinkedListImpl<T>[]) Array.newInstance(LinkedListImpl.class, priorities);
for (int i = 0; i < priorities; i++) {
- levels[i] = new LinkedListImpl<>();
+ levels[i] = new LinkedListImpl<>(comparator);
}
}
@@ -81,6 +87,15 @@ public class PriorityLinkedListImpl<T> implements PriorityLinkedList<T> {
}
@Override
+ public void addSorted(T t, int priority) {
+ checkHighest(priority);
+
+ levels[priority].addSorted(t);
+
+ exclusiveIncrementSize(1);
+ }
+
+ @Override
public T poll() {
T t = null;
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index f850cc1..dc249ff 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -405,7 +405,7 @@ public class AMQPSessionCallback implements SessionCallback {
public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception {
OperationContext oldContext = recoverContext();
try {
- ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);
+ ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts, true);
((ServerConsumer) brokerConsumer).getQueue().forceDelivery();
} finally {
resetContext(oldContext);
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index abcfe3f..b473a87 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -131,7 +131,7 @@ public class MQTTPublishManager {
sendServerMessage(mqttid, message, deliveryCount, qos);
} else {
// Client must have disconnected and it's Subscription QoS cleared
- consumer.individualCancel(message.getMessageID(), false);
+ consumer.individualCancel(message.getMessageID(), false, true);
}
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 2d7f373..bab38d6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -172,7 +172,8 @@ public interface Queue extends Bindable,CriticalComponent {
void cancel(Transaction tx, MessageReference ref, boolean ignoreRedeliveryCheck);
- void cancel(MessageReference reference, long timeBase) throws Exception;
+ /** @param sorted it should use the messageID as a reference to where to add it in the queue */
+ void cancel(MessageReference reference, long timeBase, boolean sorted) throws Exception;
void deliverAsync();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
index 4d35919..f1f8b1e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
@@ -98,7 +98,7 @@ public interface ServerConsumer extends Consumer, ConsumerInfo {
void reject(long messageID) throws Exception;
- void individualCancel(long messageID, boolean failed) throws Exception;
+ void individualCancel(long messageID, boolean failed, boolean sorted) throws Exception;
void forceDelivery(long sequence);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 7d5bbe8..1024118 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -354,7 +354,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
refqueue = ref.getQueue();
try {
- refqueue.cancel(ref, timeBase);
+ refqueue.cancel(ref, timeBase, false);
} catch (Exception e) {
// There isn't much we can do besides log an error
ActiveMQServerLogger.LOGGER.errorCancellingRefOnBridge(e, ref);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index 97bb7f6..459b703 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.server.impl;
+import java.util.Comparator;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Consumer;
@@ -33,6 +34,28 @@ import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
*/
public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference, Runnable {
+ private static final MessageReferenceComparatorByID idComparator = new MessageReferenceComparatorByID();
+
+ public static Comparator<MessageReference> getIDComparator() {
+ return idComparator;
+ }
+
+ private static class MessageReferenceComparatorByID implements Comparator<MessageReference> {
+
+ @Override
+ public int compare(MessageReference o1, MessageReference o2) {
+ long value = o2.getMessage().getMessageID() - o1.getMessage().getMessageID();
+ if (value > 0) {
+ return 1;
+ } else if (value < 0) {
+ return -1;
+ } else {
+ return 0;
+ }
+ }
+ }
+
+
private static final AtomicIntegerFieldUpdater<MessageReferenceImpl> DELIVERY_COUNT_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(MessageReferenceImpl.class, "deliveryCount");
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 1c133ef..e19d9ef 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -179,7 +179,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private final MpscUnboundedArrayQueue<MessageReference> intermediateMessageReferences = new MpscUnboundedArrayQueue<>(8192);
// This is where messages are stored
- private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES);
+ private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getIDComparator());
// The quantity of pagedReferences on messageReferences priority list
private final AtomicInteger pagedReferences = new AtomicInteger(0);
@@ -1631,11 +1631,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
@Override
- public synchronized void cancel(final MessageReference reference, final long timeBase) throws Exception {
+ public synchronized void cancel(final MessageReference reference, final long timeBase, boolean sorted) throws Exception {
Pair<Boolean, Boolean> redeliveryResult = checkRedelivery(reference, timeBase, false);
if (redeliveryResult.getA()) {
if (!scheduledDeliveryHandler.checkAndSchedule(reference, false)) {
- internalAddHead(reference);
+ if (sorted) {
+ internalAddSorted(reference);
+ } else {
+ internalAddHead(reference);
+ }
}
resetAllIterators();
@@ -2469,6 +2473,23 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
messageReferences.addHead(ref, priority);
}
+ /**
+ * The caller of this method requires synchronized on the queue.
+ * I'm not going to add synchronized to this method just for a precaution,
+ * as I'm not 100% sure this won't cause any extra runtime.
+ *
+ * @param ref
+ */
+ private void internalAddSorted(final MessageReference ref) {
+ queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
+ pendingMetrics.incrementMetrics(ref);
+ refAdded(ref);
+
+ int priority = getPriority(ref);
+
+ messageReferences.addSorted(ref, priority);
+ }
+
private int getPriority(MessageReference ref) {
try {
return ref.getMessage().getPriority();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 37dd74a..54cf9a2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -992,7 +992,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
@Override
- public synchronized void individualCancel(final long messageID, boolean failed) throws Exception {
+ public synchronized void individualCancel(final long messageID, boolean failed, boolean sorted) throws Exception {
if (browseOnly) {
return;
}
@@ -1007,7 +1007,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
ref.decrementDeliveryCount();
}
- ref.getQueue().cancel(ref, System.currentTimeMillis());
+ ref.getQueue().cancel(ref, System.currentTimeMillis(), sorted);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 9044302..296382b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1146,7 +1146,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
ServerConsumer consumer = locateConsumer(consumerID);
if (consumer != null) {
- consumer.individualCancel(messageID, failed);
+ consumer.individualCancel(messageID, failed, false);
}
}
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index ad4cbb3..ea15264 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -1108,7 +1108,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
- public void cancel(MessageReference reference, long timeBase) throws Exception {
+ public void cancel(MessageReference reference, long timeBase, boolean backInPlace) throws Exception {
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
index 58bf2d3..ee7bdbb 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
@@ -150,7 +150,7 @@ public class DummyServerConsumer implements ServerConsumer {
}
@Override
- public void individualCancel(long messageID, boolean failed) throws Exception {
+ public void individualCancel(long messageID, boolean failed, boolean sorted) throws Exception {
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
new file mode 100644
index 0000000..dcc5a40
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFactory;
+
+@RunWith(value = Parameterized.class)
+public class JMSOrderTest extends JMSTestBase {
+
+ String protocol;
+
+ ConnectionFactory protocolCF;
+
+ public JMSOrderTest(String protocol) {
+ this.protocol = protocol;
+ }
+
+ @Before
+ public void setupCF() {
+ protocolCF = createConnectionFactory(protocol, "tcp://localhost:61616");
+ }
+
+ @Parameterized.Parameters(name = "protocol={0}")
+ public static Collection getParameters() {
+ return Arrays.asList(new Object[][]{{"AMQP"}, {"OPENWIRE"}, {"CORE"}});
+ }
+
+ protected void sendToAmqQueue(int count) throws Exception {
+ Connection activemqConnection = protocolCF.createConnection();
+ Session amqSession = activemqConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue amqTestQueue = amqSession.createQueue(name.getMethodName());
+ sendMessages(activemqConnection, amqTestQueue, count);
+ activemqConnection.close();
+ }
+
+ public void sendMessages(Connection connection, Destination destination, int count) throws Exception {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer p = session.createProducer(destination);
+
+ for (int i = 1; i <= count; i++) {
+ TextMessage message = session.createTextMessage();
+ message.setText("TextMessage: " + i);
+ message.setIntProperty("nr", i);
+ p.send(message);
+ }
+
+ session.close();
+
+ }
+
+ @Test(timeout = 60000)
+ public void testReceiveSomeThenRollback() throws Exception {
+ Connection connection = protocolCF.createConnection();
+ try {
+ connection.start();
+
+ int totalCount = 5;
+ int consumeBeforeRollback = 2;
+
+ sendToAmqQueue(totalCount);
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ for (int i = 1; i <= consumeBeforeRollback; i++) {
+ Message message = consumer.receive(3000);
+ assertNotNull(message);
+ assertEquals("Unexpected message number", i, message.getIntProperty("nr"));
+ }
+
+ session.rollback();
+
+ // Consume again.. the previously consumed messages should get delivered
+ // again after the rollback and then the remainder should follow
+ List<Integer> messageNumbers = new ArrayList<>();
+ for (int i = 1; i <= totalCount; i++) {
+ Message message = consumer.receive(3000);
+ assertNotNull("Failed to receive message: " + i, message);
+ int msgNum = message.getIntProperty("nr");
+ System.out.println("Received " + msgNum);
+ messageNumbers.add(msgNum);
+ }
+
+ session.commit();
+
+ assertEquals("Unexpected size of list", totalCount, messageNumbers.size());
+ for (int i = 0; i < messageNumbers.size(); i++) {
+ assertEquals("Unexpected order of messages: " + messageNumbers, Integer.valueOf(i + 1), messageNumbers.get(i));
+ }
+ } finally {
+ connection.close();
+ }
+
+ }
+
+}
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 290aa15..612d621 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -351,7 +351,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
- public void cancel(final MessageReference reference, final long timeBase) throws Exception {
+ public void cancel(final MessageReference reference, final long timeBase, boolean sorted) throws Exception {
// no-op
}
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
index e7c3eba..5d8f21b 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
@@ -17,14 +17,18 @@
package org.apache.activemq.artemis.tests.unit.util;
import java.lang.ref.WeakReference;
+import java.util.Comparator;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -37,7 +41,74 @@ public class LinkedListTest extends ActiveMQTestBase {
public void setUp() throws Exception {
super.setUp();
- list = new LinkedListImpl<>();
+ list = new LinkedListImpl<>(integerComparator);
+ }
+
+ Comparator<Integer> integerComparator = new Comparator<Integer>() {
+ @Override
+ public int compare(Integer o1, Integer o2) {
+ if (o1.intValue() == o2.intValue()) {
+ return 0;
+ }
+ if (o2.intValue() > o1.intValue()) {
+ return 1;
+ } else {
+ return -1;
+ }
+ }
+ };
+
+ @Test
+ public void addSorted() {
+
+ list.addSorted(1);
+ list.addSorted(3);
+ list.addSorted(2);
+ list.addSorted(0);
+ validateOrder(null);
+ Assert.assertEquals(4, list.size());
+
+ }
+
+
+ @Test
+ public void randomSorted() {
+
+ HashSet<Integer> values = new HashSet<>();
+ for (int i = 0; i < 1000; i++) {
+
+ int value = RandomUtil.randomInt();
+ if (!values.contains(value)) {
+ values.add(value);
+ list.addSorted(value);
+ }
+ }
+
+ Assert.assertEquals(values.size(), list.size());
+
+ validateOrder(values);
+
+ Assert.assertEquals(0, values.size());
+
+ }
+
+ private void validateOrder(HashSet<Integer> values) {
+ Integer previous = null;
+ LinkedListIterator<Integer> integerIterator = list.iterator();
+ while (integerIterator.hasNext()) {
+
+ Integer value = integerIterator.next();
+ if (previous != null) {
+ Assert.assertTrue(value + " should be > " + previous, integerComparator.compare(previous, value) > 0);
+ Assert.assertTrue(value + " should be > " + previous, value.intValue() > previous.intValue());
+ }
+
+ if (values != null) {
+ values.remove(value);
+ }
+ previous = value;
+ }
+ integerIterator.close();
}
@Test