You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/11/09 20:43:08 UTC

[activemq-artemis] branch main updated: ARTEMIS-4084 Fixing addSorted with large transactions

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 03b82142eb ARTEMIS-4084 Fixing addSorted with large transactions
03b82142eb is described below

commit 03b82142eb0844b9de02ca3d7ed365d849e3ac02
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Tue Nov 8 09:42:16 2022 -0500

    ARTEMIS-4084 Fixing addSorted with large transactions
    
    when cancelling a large number of messages, the addSorted could be holding a lock for too long causing the server to crash under CriticalAnalyzer
    
    co-authored: AntonRoskvist <an...@volvo.com> (discovering the issue and providing the test ClientCrashMassiveRollbackTest.java)
---
 .../artemis/utils/collections/LinkedListImpl.java  |  67 ++++++++++--
 .../artemis/core/server/impl/QueueImpl.java        |  42 +++++--
 .../artemis/core/server/impl/RefsOperation.java    |   4 +-
 .../client/ClientCrashMassiveRollbackTest.java     | 121 +++++++++++++++++++++
 .../artemis/tests/unit/util/LinkedListTest.java    |  73 +++++++++++--
 5 files changed, 281 insertions(+), 26 deletions(-)

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
index 76ee69499e..8d1c98eada 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
@@ -16,12 +16,16 @@
  */
 package org.apache.activemq.artemis.utils.collections;
 
+import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Array;
 import java.util.Comparator;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.function.Consumer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * A linked list implementation which allows multiple iterators to exist at the same time on the queue, and which see any
  * elements added or removed from the queue either directly or via iterators.
@@ -30,6 +34,8 @@ import java.util.function.Consumer;
  */
 public class LinkedListImpl<E> implements LinkedList<E> {
 
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
    private static final int INITIAL_ITERATOR_ARRAY_SIZE = 10;
 
    private final Node<E> head = new NodeHolder<>(null);
@@ -42,6 +48,8 @@ public class LinkedListImpl<E> implements LinkedList<E> {
    private int nextIndex;
    private NodeStore<E> nodeStore;
 
+   private volatile Node<E> lastAdd;
+
    public LinkedListImpl() {
       this(null, null);
    }
@@ -155,12 +163,18 @@ public class LinkedListImpl<E> implements LinkedList<E> {
    }
 
    private void itemAdded(Node<E> node, E item) {
+      assert node.val() == item;
+      lastAdd = node;
+      if (logger.isTraceEnabled()) {
+         logger.trace("Setting lastAdd as {}, e={}", lastAdd, lastAdd.val());
+      }
       if (nodeStore != null) {
          putID(item, node);
       }
    }
 
    private void itemRemoved(Node<E> node) {
+      lastAdd = null;
       if (nodeStore != null) {
          nodeStore.removeNode(node.val(), node);
       }
@@ -186,13 +200,22 @@ public class LinkedListImpl<E> implements LinkedList<E> {
    }
 
    public void addSorted(E e) {
+      final Node<E> localLastAdd = lastAdd;
+
+      logger.trace("**** addSorted element {}", e);
+
       if (comparator == null) {
          throw new NullPointerException("comparator=null");
       }
+
       if (size == 0) {
+         logger.trace("adding head as there are no elements {}", e);
          addHead(e);
       } else {
          if (comparator.compare(head.next.val(), e) < 0) {
+            if (logger.isTraceEnabled()) {
+               logger.trace("addHead as e={} and head={}", e, head.next.val());
+            }
             addHead(e);
             return;
          }
@@ -203,18 +226,30 @@ public class LinkedListImpl<E> implements LinkedList<E> {
          // This would be an optimization for our usage.
          // avoiding scanning the entire List just to add at the end, so we compare the end first.
          if (comparator.compare(tail.val(), e) >= 0) {
+            logger.trace("addTail as e={} and tail={}", e, tail.val());
             addTail(e);
             return;
          }
 
-         Node<E> fetching = head.next;
-         while (fetching.next != null) {
-            int compareNext = comparator.compare(fetching.next.val(), e);
-            if (compareNext <= 0) {
-               addAfter(fetching, e);
-               return;
+         if (localLastAdd != null) { // as an optimization we check against the last add rather than always scan.
+            if (localLastAdd.prev != null && localLastAdd.prev.val() != null) {
+               if (comparator.compare(localLastAdd.prev.val(), e) > 0 && comparator.compare(localLastAdd.val(), e) < 0) {
+                  logger.trace("Adding {} before most recent added element {}", e, localLastAdd.val());
+                  addAfter(localLastAdd.prev, e);
+                  return;
+               }
             }
-            fetching = fetching.next;
+            if (localLastAdd.next != null && localLastAdd.next.val() != null) {
+               if (comparator.compare(localLastAdd.val(), e) > 0 && comparator.compare(localLastAdd.next.val(), e) < 0) {
+                  logger.trace("Adding {} after most recent added element {}", e, localLastAdd.val());
+                  addAfter(localLastAdd, e);
+                  return;
+               }
+            }
+         }
+
+         if (addSortedScan(e)) {
+            return;
          }
 
          // this shouldn't happen as the tail was compared before iterating
@@ -229,6 +264,22 @@ public class LinkedListImpl<E> implements LinkedList<E> {
       }
    }
 
+   protected boolean addSortedScan(E e) {
+      logger.trace("addSortedScan {}...", e);
+      Node<E> fetching = head.next;
+      while (fetching.next != null) {
+         int compareNext = comparator.compare(fetching.next.val(), e);
+         if (compareNext <= 0) {
+            addAfter(fetching, e);
+            logger.trace("... addSortedScan done, returning true");
+            return true;
+         }
+         fetching = fetching.next;
+      }
+      logger.trace("... addSortedScan done, could not find a spot, returning false");
+      return false;
+   }
+
    private void addAfter(Node<E> node, E e) {
       Node<E> newNode = Node.with(e);
       Node<E> nextNode = node.next;
@@ -236,7 +287,7 @@ public class LinkedListImpl<E> implements LinkedList<E> {
       newNode.prev = node;
       newNode.next = nextNode;
       nextNode.prev = newNode;
-      itemAdded(node, e);
+      itemAdded(newNode, e);
       size++;
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 908bdca23e..d222d192fd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1100,8 +1100,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    /* Called when a message is cancelled back into the queue */
    @Override
    public void addHead(final MessageReference ref, boolean scheduling) {
-      if (logger.isDebugEnabled()) {
-         logger.debug("AddHead, size = {}, intermediate size = {}, references size = {}\nreference={}", queueMemorySize, intermediateMessageReferences.size(), messageReferences.size(), ref);
+      if (logger.isTraceEnabled()) {
+         logger.trace("AddHead, size = {}, intermediate size = {}, references size = {}\nreference={}", queueMemorySize, intermediateMessageReferences.size(), messageReferences.size(), ref);
       }
       try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_HEAD)) {
          synchronized (this) {
@@ -1125,11 +1125,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    /* Called when a message is cancelled back into the queue */
    @Override
    public void addSorted(final MessageReference ref, boolean scheduling) {
-      if (logger.isDebugEnabled()) {
-         logger.debug("addSorted, size = {}, intermediate size = {}, references size = {}\nreference={}", queueMemorySize, intermediateMessageReferences.size(), messageReferences.size(), ref);
+      if (logger.isTraceEnabled()) {
+         logger.trace("addSorted, size = {}, intermediate size = {}, references size = {}\nreference={}", queueMemorySize, intermediateMessageReferences.size(), messageReferences.size(), ref);
       }
       try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_HEAD)) {
-         synchronized (this) {
+         synchronized (QueueImpl.this) {
             if (ringSize != -1) {
                enforceRing(ref, false, true);
             }
@@ -1165,6 +1165,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    /* Called when a message is cancelled back into the queue */
    @Override
    public void addSorted(final List<MessageReference> refs, boolean scheduling) {
+      if (refs.size() > MAX_DELIVERIES_IN_LOOP) {
+         logger.debug("Switching addSorted call to addSortedLargeTX on queue {}", name);
+         addSortedLargeTX(refs, scheduling);
+         return;
+      }
       try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_HEAD)) {
          synchronized (this) {
             for (MessageReference ref : refs) {
@@ -1178,6 +1183,29 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
    }
 
+   // Perhaps we could just replace addSorted by addSortedLargeTX
+   // However I am not 100% confident we could always resetAllIterators
+   // we certainly can in the case of a rollback in a huge TX.
+   // so I am just playing safe and keeping the original semantic for small transactions.
+   private void addSortedLargeTX(final List<MessageReference> refs, boolean scheduling) {
+      for (MessageReference ref : refs) {
+         // When dealing with large transactions, we are not holding a synchronization lock here.
+         // addSorted will lock for each individual adds
+         addSorted(ref, scheduling);
+      }
+
+      if (logger.isDebugEnabled()) {
+         logger.debug("addSortedHugeLoad finished on queue {}", name);
+      }
+
+      synchronized (this) {
+
+         resetAllIterators();
+
+         deliverAsync();
+      }
+   }
+
    @Override
    public synchronized void reload(final MessageReference ref) {
       queueMemorySize.addSize(ref.getMessageMemoryEstimate());
@@ -2983,8 +3011,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
     * are no more matching or available messages.
     */
    private boolean deliver() {
-      if (logger.isDebugEnabled()) {
-         logger.debug("Queue {} doing deliver. messageReferences={} with consumers={}", name, messageReferences.size(), getConsumerCount());
+      if (logger.isTraceEnabled()) {
+         logger.trace("Queue {} doing deliver. messageReferences={} with consumers={}", name, messageReferences.size(), getConsumerCount());
       }
 
       scheduledRunners.decrementAndGet();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index 65f39783c9..6930047c4f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -127,9 +127,7 @@ public class RefsOperation extends TransactionOperationAbstract {
 
          QueueImpl queue = entry.getKey();
 
-         synchronized (queue) {
-            queue.postRollback(refs);
-         }
+         queue.postRollback(refs);
       }
 
       if (!ackedRefs.isEmpty()) {
diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java
new file mode 100644
index 0000000000..9e5bbbef65
--- /dev/null
+++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.client;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ClientCrashMassiveRollbackTest extends ActiveMQTestBase {
+   protected ActiveMQServer server;
+   protected ClientSession session;
+   protected ClientSessionFactory sf;
+   protected ServerLocator locator;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      Configuration config = createDefaultNettyConfig();
+      config.setCriticalAnalyzer(true);
+      config.setCriticalAnalyzerTimeout(10000);
+      config.setCriticalAnalyzerCheckPeriod(5000);
+      config.setConnectionTTLOverride(5000);
+      config.setCriticalAnalyzerPolicy(CriticalAnalyzerPolicy.LOG);
+      server = createServer(false, config);
+      server.start();
+   }
+
+   @Test
+   public void clientCrashMassiveRollbackTest() throws Exception {
+      final String queueName = "queueName";
+      final int messageCount = 1000000;
+
+      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("(tcp://localhost:61616)");
+      factory.setConsumerWindowSize(-1);
+      factory.setConfirmationWindowSize(10240000);
+      Connection connection = factory.createConnection();
+      connection.start();
+
+      Thread thread = new Thread(() -> {
+         try {
+            Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Queue destination = consumerSession.createQueue(queueName);
+            MessageConsumer consumer = consumerSession.createConsumer(destination);
+            for (;;) {
+               consumer.receive();
+            }
+         } catch (Exception e) {
+         }
+      });
+
+      locator = createNettyNonHALocator();
+      locator.setConfirmationWindowSize(10240000);
+      sf = createSessionFactory(locator);
+      session = addClientSession(sf.createSession(false, true, true));
+      SendAcknowledgementHandler sendHandler = message -> {
+      };
+      session.setSendAcknowledgementHandler(sendHandler);
+      session.createQueue(new QueueConfiguration(queueName).setAddress(queueName).setRoutingType(RoutingType.ANYCAST));
+      ClientProducer producer = session.createProducer(queueName);
+      QueueControl queueControl = (QueueControl)server.getManagementService().getResource(ResourceNames.QUEUE + queueName);
+
+      thread.start();
+
+      for (int i = 0; i < messageCount; i++) {
+         producer.send(session.createMessage(true));
+      }
+      producer.close();
+
+      while (queueControl.getDeliveringCount() < messageCount) {
+         Thread.sleep(1000);
+      }
+
+      thread.interrupt();
+
+      Assert.assertEquals(messageCount, queueControl.getMessageCount());
+      Assert.assertEquals(ActiveMQServer.SERVER_STATE.STARTED, server.getState());
+
+      server.stop();
+
+      Wait.assertEquals(ActiveMQServer.SERVER_STATE.STOPPED, server::getState, 5000, 100);
+
+
+   }
+
+}
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
index 2245e8b02c..f95e91fc2c 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.tests.unit.util;
 
+import java.lang.invoke.MethodHandles;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -38,27 +39,42 @@ import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class LinkedListTest extends ActiveMQTestBase {
 
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private int scans = 0;
    private LinkedListImpl<Integer> list;
 
    @Override
    @Before
    public void setUp() throws Exception {
       super.setUp();
-      list = new LinkedListImpl<>(integerComparator);
+      list = new LinkedListImpl<>(integerComparator) {
+         @Override
+         protected boolean addSortedScan(Integer e) {
+            scans++;
+            return super.addSortedScan(e);
+         }
+      };
    }
 
    Comparator<Integer> integerComparator = new Comparator<Integer>() {
       @Override
       public int compare(Integer o1, Integer o2) {
+         logger.trace("Compare {} and {}", o1, o2);
          if (o1.intValue() == o2.intValue()) {
+            logger.trace("Return 0");
             return 0;
          }
          if (o2.intValue() > o1.intValue()) {
+            logger.trace("o2 is greater than, returning 1");
             return 1;
          } else {
+            logger.trace("o2 is lower than, returning -1");
             return -1;
          }
       }
@@ -66,27 +82,68 @@ public class LinkedListTest extends ActiveMQTestBase {
 
    @Test
    public void addSorted() {
+      Assert.assertEquals(0, scans); // sanity check
 
       list.addSorted(1);
       list.addSorted(3);
       list.addSorted(2);
       list.addSorted(0);
+
+      Assert.assertEquals(0, scans); // all adds were somewhat ordered, it shouldn't be doing any scans
+
       validateOrder(null);
       Assert.assertEquals(4, list.size());
 
    }
 
+   @Test
+   public void addSortedCachedLast() {
+      Assert.assertEquals(0, scans); // just a sanity check
+      list.addSorted(5);
+      list.addSorted(1);
+      list.addSorted(3);
+      list.addSorted(4);
+      Assert.assertEquals(0, scans); // no scans made until now
+      list.addSorted(2); // this should need a scan
+      Assert.assertEquals(1, scans);
+      list.addSorted(10);
+      list.addSorted(20);
+      list.addSorted(7); // this will need a scan as it's totally random
+      Assert.assertEquals(2, scans);
+      printDebug();
+
+      validateOrder(null);
+
+   }
+
+   private void printDebug() {
+      if (logger.isDebugEnabled()) {
+         logger.debug("**** list output:");
+         LinkedListIterator<Integer> integerIterator = list.iterator();
+         while (integerIterator.hasNext()) {
+            logger.debug("list {}", integerIterator.next());
+         }
+         integerIterator.close();
+      }
+   }
 
    @Test
    public void randomSorted() {
 
-      HashSet<Integer> values = new HashSet<>();
-      for (int i = 0; i < 1000; i++) {
+      int elements = 10_000;
 
-         int value = RandomUtil.randomInt();
-         if (!values.contains(value)) {
-            values.add(value);
-            list.addSorted(value);
+      HashSet<Integer> values = new HashSet<>();
+      for (int i = 0; i < elements; i++) {
+         for (;;) { // a retry loop, if a random give me the same value twice, I would retry
+            int value = RandomUtil.randomInt();
+            if (!values.contains(value)) { // validating if the random is repeated or not, and retrying otherwise
+               if (logger.isDebugEnabled()) {
+                  logger.debug("Adding {}", value);
+               }
+               values.add(value);
+               list.addSorted(value);
+               break;
+            }
          }
       }
 
@@ -102,8 +159,8 @@ public class LinkedListTest extends ActiveMQTestBase {
       Integer previous = null;
       LinkedListIterator<Integer> integerIterator = list.iterator();
       while (integerIterator.hasNext()) {
-
          Integer value = integerIterator.next();
+         logger.debug("Reading {}", value);
          if (previous != null) {
             Assert.assertTrue(value + " should be > " + previous, integerComparator.compare(previous, value) > 0);
             Assert.assertTrue(value + " should be > " + previous, value.intValue() > previous.intValue());