You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/11/28 15:04:01 UTC
[activemq-artemis] 13/13: ARTEMIS-4084 Dealing with multi consumers crashes, Improving cached addSorted
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch 2.27.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit d79c5c41b0f5222a22a6cb71a4ed19084e0180fd
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Nov 25 11:41:03 2022 -0500
ARTEMIS-4084 Dealing with multi consumers crashes, Improving cached addSorted
(cherry picked from commit af2b8e4b072e13a9925c6fecd048be130cf3688e)
---
.../artemis/utils/collections/LinkedListImpl.java | 44 ++++++++++++++++++----
.../client/ClientCrashMassiveRollbackTest.java | 4 ++
.../artemis/tests/unit/util/LinkedListTest.java | 40 +++++++++++++++-----
3 files changed, 71 insertions(+), 17 deletions(-)
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
index 8d1c98eada..5e36d333d2 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
@@ -232,17 +232,20 @@ public class LinkedListImpl<E> implements LinkedList<E> {
}
if (localLastAdd != null) { // as an optimization we check against the last add rather than always scan.
- if (localLastAdd.prev != null && localLastAdd.prev.val() != null) {
- if (comparator.compare(localLastAdd.prev.val(), e) > 0 && comparator.compare(localLastAdd.val(), e) < 0) {
- logger.trace("Adding {} before most recent added element {}", e, localLastAdd.val());
- addAfter(localLastAdd.prev, e);
+ if (logger.isDebugEnabled()) {
+ logger.debug("localLastAdd Value = {}, we are adding {}", localLastAdd.val(), e);
+ }
+
+ int compareLastAdd = comparator.compare(localLastAdd.val(), e);
+
+ if (compareLastAdd > 0) {
+ if (scanRight(localLastAdd, e)) {
return;
}
}
- if (localLastAdd.next != null && localLastAdd.next.val() != null) {
- if (comparator.compare(localLastAdd.val(), e) > 0 && comparator.compare(localLastAdd.next.val(), e) < 0) {
- logger.trace("Adding {} after most recent added element {}", e, localLastAdd.val());
- addAfter(localLastAdd, e);
+
+ if (compareLastAdd < 0) {
+ if (scanLeft(localLastAdd, e)) {
return;
}
}
@@ -264,6 +267,31 @@ public class LinkedListImpl<E> implements LinkedList<E> {
}
}
+ protected boolean scanRight(Node<E> position, E e) {
+ Node<E> fetching = position.next;
+ while (fetching != null) {
+ if (comparator.compare(fetching.val(), e) < 0) {
+ addAfter(position, e);
+ return true;
+ }
+ position = fetching;
+ fetching = fetching.next;
+ }
+ return false; // unlikely to happen, using this just to be safe
+ }
+
+ protected boolean scanLeft(Node<E> position, E e) {
+ Node<E> fetching = position.prev;
+ while (fetching != null) {
+ if (comparator.compare(fetching.val(), e) > 0) {
+ addAfter(fetching, e);
+ return true;
+ }
+ fetching = fetching.prev;
+ }
+ return false; // unlikely to happen, using this just to be safe
+ }
+
protected boolean addSortedScan(E e) {
logger.trace("addSortedScan {}...", e);
Node<E> fetching = head.next;
diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java
index e4641f9696..82c22f4e44 100644
--- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java
+++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java
@@ -77,8 +77,12 @@ public class ClientCrashMassiveRollbackTest extends ActiveMQTestBase {
Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue destination = consumerSession.createQueue(queueName);
MessageConsumer consumer = consumerSession.createConsumer(destination);
+ MessageConsumer consumer2 = consumerSession.createConsumer(destination);
+ MessageConsumer consumer3 = consumerSession.createConsumer(destination);
for (;;) {
consumer.receive();
+ consumer2.receive();
+ consumer3.receive();
}
} catch (Exception e) {
}
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
index b39a9e75dc..eacd29e469 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
@@ -33,9 +33,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import io.netty.util.collection.LongObjectHashMap;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
-import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -103,23 +103,45 @@ public class LinkedListTest extends ActiveMQTestBase {
list.addSorted(1);
list.addSorted(3);
list.addSorted(4);
- Assert.assertEquals(0, scans); // no scans made until now
- list.addSorted(2); // this should need a scan
- Assert.assertEquals(1, scans);
+ list.addSorted(2);
list.addSorted(10);
list.addSorted(20);
list.addSorted(19);
- list.addSorted(7); // this will need a scan as it's totally random
- Assert.assertEquals(2, scans);
+ list.addSorted(7);
list.addSorted(8);
- Assert.assertEquals(2, scans);
+ Assert.assertEquals(0, scans); // no full scans should be done
Assert.assertEquals(1, (int)list.poll());
list.addSorted(9);
- Assert.assertEquals(3, scans); // remove (poll) should clear the last added cache
- printDebug();
+ Assert.assertEquals(1, scans); // remove (poll) should clear the last added cache, a scan will be needed
+ printDebug();
validateOrder(null);
+ }
+ @Test
+ public void scanDirectionalTest() {
+ list.addSorted(9);
+ Assert.assertEquals(1, list.size());
+ list.addSorted(5);
+ Assert.assertEquals(2, list.size());
+ list.addSorted(6);
+ Assert.assertEquals(3, list.size());
+ list.addSorted(2);
+ Assert.assertEquals(4, list.size());
+ list.addSorted(7);
+ Assert.assertEquals(5, list.size());
+ list.addSorted(4);
+ Assert.assertEquals(6, list.size());
+ list.addSorted(8);
+ Assert.assertEquals(7, list.size());
+ list.addSorted(1);
+ Assert.assertEquals(8, list.size());
+ list.addSorted(10);
+ Assert.assertEquals(9, list.size());
+ list.addSorted(3);
+ Assert.assertEquals(10, list.size());
+ printDebug();
+ validateOrder(null);
}
private void printDebug() {