You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/11/28 15:04:01 UTC

[activemq-artemis] 13/13: ARTEMIS-4084 Dealing with multi consumers crashes, Improving cached addSorted

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.27.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit d79c5c41b0f5222a22a6cb71a4ed19084e0180fd
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Nov 25 11:41:03 2022 -0500

    ARTEMIS-4084 Dealing with multi consumers crashes, Improving cached addSorted
    
    (cherry picked from commit af2b8e4b072e13a9925c6fecd048be130cf3688e)
---
 .../artemis/utils/collections/LinkedListImpl.java  | 44 ++++++++++++++++++----
 .../client/ClientCrashMassiveRollbackTest.java     |  4 ++
 .../artemis/tests/unit/util/LinkedListTest.java    | 40 +++++++++++++++-----
 3 files changed, 71 insertions(+), 17 deletions(-)

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
index 8d1c98eada..5e36d333d2 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
@@ -232,17 +232,20 @@ public class LinkedListImpl<E> implements LinkedList<E> {
          }
 
          if (localLastAdd != null) { // as an optimization we check against the last add rather than always scan.
-            if (localLastAdd.prev != null && localLastAdd.prev.val() != null) {
-               if (comparator.compare(localLastAdd.prev.val(), e) > 0 && comparator.compare(localLastAdd.val(), e) < 0) {
-                  logger.trace("Adding {} before most recent added element {}", e, localLastAdd.val());
-                  addAfter(localLastAdd.prev, e);
+            if (logger.isDebugEnabled()) {
+               logger.debug("localLastAdd Value = {}, we are adding {}", localLastAdd.val(), e);
+            }
+
+            int compareLastAdd = comparator.compare(localLastAdd.val(), e);
+
+            if (compareLastAdd > 0) {
+               if (scanRight(localLastAdd, e)) {
                   return;
                }
             }
-            if (localLastAdd.next != null && localLastAdd.next.val() != null) {
-               if (comparator.compare(localLastAdd.val(), e) > 0 && comparator.compare(localLastAdd.next.val(), e) < 0) {
-                  logger.trace("Adding {} after most recent added element {}", e, localLastAdd.val());
-                  addAfter(localLastAdd, e);
+
+            if (compareLastAdd < 0) {
+               if (scanLeft(localLastAdd, e)) {
                   return;
                }
             }
@@ -264,6 +267,31 @@ public class LinkedListImpl<E> implements LinkedList<E> {
       }
    }
 
+   protected boolean scanRight(Node<E> position, E e) {
+      Node<E> fetching = position.next;
+      while (fetching != null) {
+         if (comparator.compare(fetching.val(), e) < 0) {
+            addAfter(position, e);
+            return true;
+         }
+         position = fetching;
+         fetching = fetching.next;
+      }
+      return false; // unlikely to happen, using this just to be safe
+   }
+
+   protected boolean scanLeft(Node<E> position, E e) {
+      Node<E> fetching = position.prev;
+      while (fetching != null) {
+         if (comparator.compare(fetching.val(), e) > 0) {
+            addAfter(fetching, e);
+            return true;
+         }
+         fetching = fetching.prev;
+      }
+      return false; // unlikely to happen, using this just to be safe
+   }
+
    protected boolean addSortedScan(E e) {
       logger.trace("addSortedScan {}...", e);
       Node<E> fetching = head.next;
diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java
index e4641f9696..82c22f4e44 100644
--- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java
+++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java
@@ -77,8 +77,12 @@ public class ClientCrashMassiveRollbackTest extends ActiveMQTestBase {
             Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
             Queue destination = consumerSession.createQueue(queueName);
             MessageConsumer consumer = consumerSession.createConsumer(destination);
+            MessageConsumer consumer2 = consumerSession.createConsumer(destination);
+            MessageConsumer consumer3 = consumerSession.createConsumer(destination);
             for (;;) {
                consumer.receive();
+               consumer2.receive();
+               consumer3.receive();
             }
          } catch (Exception e) {
          }
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
index b39a9e75dc..eacd29e469 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
@@ -33,9 +33,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 import io.netty.util.collection.LongObjectHashMap;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.RandomUtil;
-import org.apache.activemq.artemis.utils.collections.NodeStore;
 import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.NodeStore;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -103,23 +103,45 @@ public class LinkedListTest extends ActiveMQTestBase {
       list.addSorted(1);
       list.addSorted(3);
       list.addSorted(4);
-      Assert.assertEquals(0, scans); // no scans made until now
-      list.addSorted(2); // this should need a scan
-      Assert.assertEquals(1, scans);
+      list.addSorted(2);
       list.addSorted(10);
       list.addSorted(20);
       list.addSorted(19);
-      list.addSorted(7); // this will need a scan as it's totally random
-      Assert.assertEquals(2, scans);
+      list.addSorted(7);
       list.addSorted(8);
-      Assert.assertEquals(2, scans);
+      Assert.assertEquals(0, scans); // no full scans should be done
       Assert.assertEquals(1, (int)list.poll());
       list.addSorted(9);
-      Assert.assertEquals(3, scans); // remove (poll) should clear the last added cache
-      printDebug();
+      Assert.assertEquals(1, scans); // remove (poll) should clear the last added cache, a scan will be needed
 
+      printDebug();
       validateOrder(null);
+   }
 
+   @Test
+   public void scanDirectionalTest() {
+      list.addSorted(9);
+      Assert.assertEquals(1, list.size());
+      list.addSorted(5);
+      Assert.assertEquals(2, list.size());
+      list.addSorted(6);
+      Assert.assertEquals(3, list.size());
+      list.addSorted(2);
+      Assert.assertEquals(4, list.size());
+      list.addSorted(7);
+      Assert.assertEquals(5, list.size());
+      list.addSorted(4);
+      Assert.assertEquals(6, list.size());
+      list.addSorted(8);
+      Assert.assertEquals(7, list.size());
+      list.addSorted(1);
+      Assert.assertEquals(8, list.size());
+      list.addSorted(10);
+      Assert.assertEquals(9, list.size());
+      list.addSorted(3);
+      Assert.assertEquals(10, list.size());
+      printDebug();
+      validateOrder(null);
    }
 
    private void printDebug() {