You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/11/09 20:43:08 UTC
[activemq-artemis] branch main updated: ARTEMIS-4084 Fixing addSorted with large transactions
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 03b82142eb ARTEMIS-4084 Fixing addSorted with large transactions
03b82142eb is described below
commit 03b82142eb0844b9de02ca3d7ed365d849e3ac02
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Tue Nov 8 09:42:16 2022 -0500
ARTEMIS-4084 Fixing addSorted with large transactions
when cancelling a large number of messages, the addSorted could be holding a lock for too long causing the server to crash under CriticalAnalyzer
co-authored: AntonRoskvist <an...@volvo.com> (discovering the issue and providing the test ClientCrashMassiveRollbackTest.java)
---
.../artemis/utils/collections/LinkedListImpl.java | 67 ++++++++++--
.../artemis/core/server/impl/QueueImpl.java | 42 +++++--
.../artemis/core/server/impl/RefsOperation.java | 4 +-
.../client/ClientCrashMassiveRollbackTest.java | 121 +++++++++++++++++++++
.../artemis/tests/unit/util/LinkedListTest.java | 73 +++++++++++--
5 files changed, 281 insertions(+), 26 deletions(-)
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
index 76ee69499e..8d1c98eada 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
@@ -16,12 +16,16 @@
*/
package org.apache.activemq.artemis.utils.collections;
+import java.lang.invoke.MethodHandles;
import java.lang.reflect.Array;
import java.util.Comparator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* A linked list implementation which allows multiple iterators to exist at the same time on the queue, and which see any
* elements added or removed from the queue either directly or via iterators.
@@ -30,6 +34,8 @@ import java.util.function.Consumer;
*/
public class LinkedListImpl<E> implements LinkedList<E> {
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
private static final int INITIAL_ITERATOR_ARRAY_SIZE = 10;
private final Node<E> head = new NodeHolder<>(null);
@@ -42,6 +48,8 @@ public class LinkedListImpl<E> implements LinkedList<E> {
private int nextIndex;
private NodeStore<E> nodeStore;
+ private volatile Node<E> lastAdd;
+
public LinkedListImpl() {
this(null, null);
}
@@ -155,12 +163,18 @@ public class LinkedListImpl<E> implements LinkedList<E> {
}
private void itemAdded(Node<E> node, E item) {
+ assert node.val() == item;
+ lastAdd = node;
+ if (logger.isTraceEnabled()) {
+ logger.trace("Setting lastAdd as {}, e={}", lastAdd, lastAdd.val());
+ }
if (nodeStore != null) {
putID(item, node);
}
}
private void itemRemoved(Node<E> node) {
+ lastAdd = null;
if (nodeStore != null) {
nodeStore.removeNode(node.val(), node);
}
@@ -186,13 +200,22 @@ public class LinkedListImpl<E> implements LinkedList<E> {
}
public void addSorted(E e) {
+ final Node<E> localLastAdd = lastAdd;
+
+ logger.trace("**** addSorted element {}", e);
+
if (comparator == null) {
throw new NullPointerException("comparator=null");
}
+
if (size == 0) {
+ logger.trace("adding head as there are no elements {}", e);
addHead(e);
} else {
if (comparator.compare(head.next.val(), e) < 0) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("addHead as e={} and head={}", e, head.next.val());
+ }
addHead(e);
return;
}
@@ -203,18 +226,30 @@ public class LinkedListImpl<E> implements LinkedList<E> {
// This would be an optimization for our usage.
// avoiding scanning the entire List just to add at the end, so we compare the end first.
if (comparator.compare(tail.val(), e) >= 0) {
+ logger.trace("addTail as e={} and tail={}", e, tail.val());
addTail(e);
return;
}
- Node<E> fetching = head.next;
- while (fetching.next != null) {
- int compareNext = comparator.compare(fetching.next.val(), e);
- if (compareNext <= 0) {
- addAfter(fetching, e);
- return;
+ if (localLastAdd != null) { // as an optimization we check against the last add rather than always scan.
+ if (localLastAdd.prev != null && localLastAdd.prev.val() != null) {
+ if (comparator.compare(localLastAdd.prev.val(), e) > 0 && comparator.compare(localLastAdd.val(), e) < 0) {
+ logger.trace("Adding {} before most recent added element {}", e, localLastAdd.val());
+ addAfter(localLastAdd.prev, e);
+ return;
+ }
}
- fetching = fetching.next;
+ if (localLastAdd.next != null && localLastAdd.next.val() != null) {
+ if (comparator.compare(localLastAdd.val(), e) > 0 && comparator.compare(localLastAdd.next.val(), e) < 0) {
+ logger.trace("Adding {} after most recent added element {}", e, localLastAdd.val());
+ addAfter(localLastAdd, e);
+ return;
+ }
+ }
+ }
+
+ if (addSortedScan(e)) {
+ return;
}
// this shouldn't happen as the tail was compared before iterating
@@ -229,6 +264,22 @@ public class LinkedListImpl<E> implements LinkedList<E> {
}
}
+ protected boolean addSortedScan(E e) {
+ logger.trace("addSortedScan {}...", e);
+ Node<E> fetching = head.next;
+ while (fetching.next != null) {
+ int compareNext = comparator.compare(fetching.next.val(), e);
+ if (compareNext <= 0) {
+ addAfter(fetching, e);
+ logger.trace("... addSortedScan done, returning true");
+ return true;
+ }
+ fetching = fetching.next;
+ }
+ logger.trace("... addSortedScan done, could not find a spot, returning false");
+ return false;
+ }
+
private void addAfter(Node<E> node, E e) {
Node<E> newNode = Node.with(e);
Node<E> nextNode = node.next;
@@ -236,7 +287,7 @@ public class LinkedListImpl<E> implements LinkedList<E> {
newNode.prev = node;
newNode.next = nextNode;
nextNode.prev = newNode;
- itemAdded(node, e);
+ itemAdded(newNode, e);
size++;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 908bdca23e..d222d192fd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1100,8 +1100,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
/* Called when a message is cancelled back into the queue */
@Override
public void addHead(final MessageReference ref, boolean scheduling) {
- if (logger.isDebugEnabled()) {
- logger.debug("AddHead, size = {}, intermediate size = {}, references size = {}\nreference={}", queueMemorySize, intermediateMessageReferences.size(), messageReferences.size(), ref);
+ if (logger.isTraceEnabled()) {
+ logger.trace("AddHead, size = {}, intermediate size = {}, references size = {}\nreference={}", queueMemorySize, intermediateMessageReferences.size(), messageReferences.size(), ref);
}
try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_HEAD)) {
synchronized (this) {
@@ -1125,11 +1125,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
/* Called when a message is cancelled back into the queue */
@Override
public void addSorted(final MessageReference ref, boolean scheduling) {
- if (logger.isDebugEnabled()) {
- logger.debug("addSorted, size = {}, intermediate size = {}, references size = {}\nreference={}", queueMemorySize, intermediateMessageReferences.size(), messageReferences.size(), ref);
+ if (logger.isTraceEnabled()) {
+ logger.trace("addSorted, size = {}, intermediate size = {}, references size = {}\nreference={}", queueMemorySize, intermediateMessageReferences.size(), messageReferences.size(), ref);
}
try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_HEAD)) {
- synchronized (this) {
+ synchronized (QueueImpl.this) {
if (ringSize != -1) {
enforceRing(ref, false, true);
}
@@ -1165,6 +1165,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
/* Called when a message is cancelled back into the queue */
@Override
public void addSorted(final List<MessageReference> refs, boolean scheduling) {
+ if (refs.size() > MAX_DELIVERIES_IN_LOOP) {
+ logger.debug("Switching addSorted call to addSortedLargeTX on queue {}", name);
+ addSortedLargeTX(refs, scheduling);
+ return;
+ }
try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_HEAD)) {
synchronized (this) {
for (MessageReference ref : refs) {
@@ -1178,6 +1183,29 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
+ // Perhaps we could just replace addSorted by addSortedLargeTX
+ // However I am not 100% confident we could always resetAllIterators
+ // we certainly can in the case of a rollback in a huge TX.
+ // so I am just playing safe and keeping the original semantic for small transactions.
+ private void addSortedLargeTX(final List<MessageReference> refs, boolean scheduling) {
+ for (MessageReference ref : refs) {
+ // When dealing with large transactions, we are not holding a synchronization lock here.
+ // addSorted will lock for each individual adds
+ addSorted(ref, scheduling);
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("addSortedHugeLoad finished on queue {}", name);
+ }
+
+ synchronized (this) {
+
+ resetAllIterators();
+
+ deliverAsync();
+ }
+ }
+
@Override
public synchronized void reload(final MessageReference ref) {
queueMemorySize.addSize(ref.getMessageMemoryEstimate());
@@ -2983,8 +3011,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
* are no more matching or available messages.
*/
private boolean deliver() {
- if (logger.isDebugEnabled()) {
- logger.debug("Queue {} doing deliver. messageReferences={} with consumers={}", name, messageReferences.size(), getConsumerCount());
+ if (logger.isTraceEnabled()) {
+ logger.trace("Queue {} doing deliver. messageReferences={} with consumers={}", name, messageReferences.size(), getConsumerCount());
}
scheduledRunners.decrementAndGet();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index 65f39783c9..6930047c4f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -127,9 +127,7 @@ public class RefsOperation extends TransactionOperationAbstract {
QueueImpl queue = entry.getKey();
- synchronized (queue) {
- queue.postRollback(refs);
- }
+ queue.postRollback(refs);
}
if (!ackedRefs.isEmpty()) {
diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java
new file mode 100644
index 0000000000..9e5bbbef65
--- /dev/null
+++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.client;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ClientCrashMassiveRollbackTest extends ActiveMQTestBase {
+ protected ActiveMQServer server;
+ protected ClientSession session;
+ protected ClientSessionFactory sf;
+ protected ServerLocator locator;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ Configuration config = createDefaultNettyConfig();
+ config.setCriticalAnalyzer(true);
+ config.setCriticalAnalyzerTimeout(10000);
+ config.setCriticalAnalyzerCheckPeriod(5000);
+ config.setConnectionTTLOverride(5000);
+ config.setCriticalAnalyzerPolicy(CriticalAnalyzerPolicy.LOG);
+ server = createServer(false, config);
+ server.start();
+ }
+
+ @Test
+ public void clientCrashMassiveRollbackTest() throws Exception {
+ final String queueName = "queueName";
+ final int messageCount = 1000000;
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("(tcp://localhost:61616)");
+ factory.setConsumerWindowSize(-1);
+ factory.setConfirmationWindowSize(10240000);
+ Connection connection = factory.createConnection();
+ connection.start();
+
+ Thread thread = new Thread(() -> {
+ try {
+ Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue destination = consumerSession.createQueue(queueName);
+ MessageConsumer consumer = consumerSession.createConsumer(destination);
+ for (;;) {
+ consumer.receive();
+ }
+ } catch (Exception e) {
+ }
+ });
+
+ locator = createNettyNonHALocator();
+ locator.setConfirmationWindowSize(10240000);
+ sf = createSessionFactory(locator);
+ session = addClientSession(sf.createSession(false, true, true));
+ SendAcknowledgementHandler sendHandler = message -> {
+ };
+ session.setSendAcknowledgementHandler(sendHandler);
+ session.createQueue(new QueueConfiguration(queueName).setAddress(queueName).setRoutingType(RoutingType.ANYCAST));
+ ClientProducer producer = session.createProducer(queueName);
+ QueueControl queueControl = (QueueControl)server.getManagementService().getResource(ResourceNames.QUEUE + queueName);
+
+ thread.start();
+
+ for (int i = 0; i < messageCount; i++) {
+ producer.send(session.createMessage(true));
+ }
+ producer.close();
+
+ while (queueControl.getDeliveringCount() < messageCount) {
+ Thread.sleep(1000);
+ }
+
+ thread.interrupt();
+
+ Assert.assertEquals(messageCount, queueControl.getMessageCount());
+ Assert.assertEquals(ActiveMQServer.SERVER_STATE.STARTED, server.getState());
+
+ server.stop();
+
+ Wait.assertEquals(ActiveMQServer.SERVER_STATE.STOPPED, server::getState, 5000, 100);
+
+
+ }
+
+}
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
index 2245e8b02c..f95e91fc2c 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.unit.util;
+import java.lang.invoke.MethodHandles;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@@ -38,27 +39,42 @@ import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class LinkedListTest extends ActiveMQTestBase {
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private int scans = 0;
private LinkedListImpl<Integer> list;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
- list = new LinkedListImpl<>(integerComparator);
+ list = new LinkedListImpl<>(integerComparator) {
+ @Override
+ protected boolean addSortedScan(Integer e) {
+ scans++;
+ return super.addSortedScan(e);
+ }
+ };
}
Comparator<Integer> integerComparator = new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
+ logger.trace("Compare {} and {}", o1, o2);
if (o1.intValue() == o2.intValue()) {
+ logger.trace("Return 0");
return 0;
}
if (o2.intValue() > o1.intValue()) {
+ logger.trace("o2 is greater than, returning 1");
return 1;
} else {
+ logger.trace("o2 is lower than, returning -1");
return -1;
}
}
@@ -66,27 +82,68 @@ public class LinkedListTest extends ActiveMQTestBase {
@Test
public void addSorted() {
+ Assert.assertEquals(0, scans); // sanity check
list.addSorted(1);
list.addSorted(3);
list.addSorted(2);
list.addSorted(0);
+
+ Assert.assertEquals(0, scans); // all adds were somewhat ordered, it shouldn't be doing any scans
+
validateOrder(null);
Assert.assertEquals(4, list.size());
}
+ @Test
+ public void addSortedCachedLast() {
+ Assert.assertEquals(0, scans); // just a sanity check
+ list.addSorted(5);
+ list.addSorted(1);
+ list.addSorted(3);
+ list.addSorted(4);
+ Assert.assertEquals(0, scans); // no scans made until now
+ list.addSorted(2); // this should need a scan
+ Assert.assertEquals(1, scans);
+ list.addSorted(10);
+ list.addSorted(20);
+ list.addSorted(7); // this will need a scan as it's totally random
+ Assert.assertEquals(2, scans);
+ printDebug();
+
+ validateOrder(null);
+
+ }
+
+ private void printDebug() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("**** list output:");
+ LinkedListIterator<Integer> integerIterator = list.iterator();
+ while (integerIterator.hasNext()) {
+ logger.debug("list {}", integerIterator.next());
+ }
+ integerIterator.close();
+ }
+ }
@Test
public void randomSorted() {
- HashSet<Integer> values = new HashSet<>();
- for (int i = 0; i < 1000; i++) {
+ int elements = 10_000;
- int value = RandomUtil.randomInt();
- if (!values.contains(value)) {
- values.add(value);
- list.addSorted(value);
+ HashSet<Integer> values = new HashSet<>();
+ for (int i = 0; i < elements; i++) {
+ for (;;) { // a retry loop, if a random give me the same value twice, I would retry
+ int value = RandomUtil.randomInt();
+ if (!values.contains(value)) { // validating if the random is repeated or not, and retrying otherwise
+ if (logger.isDebugEnabled()) {
+ logger.debug("Adding {}", value);
+ }
+ values.add(value);
+ list.addSorted(value);
+ break;
+ }
}
}
@@ -102,8 +159,8 @@ public class LinkedListTest extends ActiveMQTestBase {
Integer previous = null;
LinkedListIterator<Integer> integerIterator = list.iterator();
while (integerIterator.hasNext()) {
-
Integer value = integerIterator.next();
+ logger.debug("Reading {}", value);
if (previous != null) {
Assert.assertTrue(value + " should be > " + previous, integerComparator.compare(previous, value) > 0);
Assert.assertTrue(value + " should be > " + previous, value.intValue() > previous.intValue());