You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/10/26 01:26:21 UTC
[activemq-artemis] branch main updated: ARTEMIS-4073 Page Counters can go off sync when multiple producers are used in the same address
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new d185735e55 ARTEMIS-4073 Page Counters can go off sync when multiple producers are used in the same address
d185735e55 is described below
commit d185735e550a79c6c58d7d272fa816fae18f11ea
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Tue Oct 25 12:05:27 2022 -0400
ARTEMIS-4073 Page Counters can go off sync when multiple producers are used in the same address
---
.../paging/cursor/PageSubscriptionCounter.java | 6 --
.../cursor/impl/PageSubscriptionCounterImpl.java | 68 ++++++++-----
.../integration/paging/PagingCounterTest.java | 106 +++++++++++++++++----
.../tests/soak/paging/M_and_M_FactoryTest.java | 2 -
4 files changed, 132 insertions(+), 50 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
index 33b744f606..75eedfa9ee 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
@@ -43,12 +43,6 @@ public interface PageSubscriptionCounter {
*/
void processReload();
- /**
- * @param id
- * @param variance
- */
- void addInc(long id, int variance, long size);
-
// used when deleting the counter
void delete() throws Exception;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
index 0e7ab26dd0..07ca0373b6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
@@ -20,10 +20,12 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.paging.impl.Page;
@@ -78,6 +80,8 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
private LinkedList<PendingCounter> loadList;
+ private final Executor pageExecutor;
+
public PageSubscriptionCounterImpl(final StorageManager storage,
final PageSubscription subscription,
final boolean persistent,
@@ -86,6 +90,12 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
this.storage = storage;
this.persistent = persistent;
this.subscription = subscription;
+ if (subscription == null) {
+ this.pageExecutor = null;
+ } else {
+ this.pageExecutor = subscription.getPagingStore().getExecutor();
+ assert pageExecutor != null;
+ }
}
@Override
@@ -175,13 +185,23 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
}
@Override
- public void increment(Transaction tx, int add, long size) throws Exception {
+ public synchronized void increment(Transaction tx, int add, long size) throws Exception {
if (tx == null) {
if (persistent) {
long id = storage.storePageCounterInc(this.subscriptionID, add, size);
- incrementProcessed(id, add, size);
+ storage.getContext().executeOnCompletion(new IOCallback() {
+ @Override
+ public void done() {
+ process(id, add, size);
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+
+ }
+ });
} else {
- incrementProcessed(-1, add, size);
+ process(-1, add, size);
}
} else {
if (persistent) {
@@ -227,12 +247,12 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
this.recordID = recordID1;
}
- public synchronized void incrementProcessed(long id, int add, long size) {
- addInc(id, add, size);
- if (incrementRecords.size() > FLUSH_COUNTER) {
- this.subscription.getPagingStore().execute(this::cleanup);
+ private void process(long id, int add, long size) {
+ if (id >= 0 && pageExecutor != null) {
+ pageExecutor.execute(() -> doIncrement(id, add, size));
+ } else {
+ doIncrement(-1, add, size);
}
-
}
@Override
@@ -295,8 +315,8 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
}
}
- @Override
- public synchronized void addInc(long id, int variance, long size) {
+ // you need to call this method from the executors when id > 0
+ private synchronized void doIncrement(long id, int variance, long size) {
value.addAndGet(variance);
this.persistentSize.addAndGet(size);
if (variance > 0) {
@@ -307,6 +327,9 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
}
if (id >= 0) {
incrementRecords.add(id);
+ if (incrementRecords.size() > FLUSH_COUNTER) {
+ this.cleanup();
+ }
}
}
@@ -320,21 +343,16 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
/**
* This method should always be called from a single threaded executor
*/
- protected void cleanup() {
- ArrayList<Long> deleteList;
-
- long valueReplace;
- long sizeReplace;
- synchronized (this) {
- if (incrementRecords.size() <= FLUSH_COUNTER) {
- return;
- }
- valueReplace = value.get();
- sizeReplace = persistentSize.get();
- deleteList = new ArrayList<>(incrementRecords);
- incrementRecords.clear();
+ protected synchronized void cleanup() {
+ if (incrementRecords.size() <= FLUSH_COUNTER) {
+ return;
}
+ long valueReplace = value.get();
+ long sizeReplace = persistentSize.get();
+ ArrayList<Long> deleteList = new ArrayList<>(incrementRecords);
+ incrementRecords.clear();
+
long newRecordID = -1;
long txCleanup = storage.generateID();
@@ -352,7 +370,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
if (logger.isTraceEnabled()) {
logger.trace("Replacing page-counter record = {} by record = {} on subscriptionID = {} for queue = {}",
- recordID, newRecordID, subscriptionID, subscription.getQueue().getName());
+ recordID, newRecordID, subscriptionID, subscription.getQueue().getName());
}
storage.commit(txCleanup);
@@ -394,7 +412,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
@Override
public void afterCommit(Transaction tx) {
for (ItemOper oper : operations) {
- oper.counter.incrementProcessed(oper.id, oper.amount, oper.persistentSize);
+ oper.counter.process(oper.id, oper.amount, oper.persistentSize);
}
}
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
index 3dc6c10379..f68608e24a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
@@ -18,6 +18,14 @@ package org.apache.activemq.artemis.tests.integration.paging;
import javax.transaction.xa.Xid;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
@@ -37,14 +45,17 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class PagingCounterTest extends ActiveMQTestBase {
-
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private ActiveMQServer server;
@@ -86,15 +97,80 @@ public class PagingCounterTest extends ActiveMQTestBase {
counter.increment(tx, 1, 1000);
- assertEquals(0, counter.getValue());
- assertEquals(0, counter.getPersistentSize());
+ Wait.assertEquals(0, counter::getValue);
+ Wait.assertEquals(0, counter::getPersistentSize);
tx.commit();
- storage.waitOnOperations();
+ Wait.assertEquals(1, counter::getValue);
+ Wait.assertEquals(1000, counter::getPersistentSize);
+ } finally {
+ sf.close();
+ session.close();
+ }
+ }
+
+ @Test
+ public void testMultiThreadUpdates() throws Exception {
+ ClientSessionFactory sf = createSessionFactory(sl);
+ ClientSession session = sf.createSession();
+ AtomicInteger errors = new AtomicInteger(0);
+
+ try {
+ server.addAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST));
+ Queue queue = server.createQueue(new QueueConfiguration(new SimpleString("A1")).setRoutingType(RoutingType.ANYCAST));
+
+ final PageSubscriptionCounter counter = locateCounter(queue);
+
+ final int THREADS = 10;
+
+ final CyclicBarrier flagStart = new CyclicBarrier(THREADS);
+ final CountDownLatch done = new CountDownLatch(THREADS);
+
+ final int BUMPS = 2000;
+
+ Assert.assertEquals(0, counter.getValue());
+
+ ExecutorService executorService = Executors.newFixedThreadPool(THREADS);
+ runAfter(executorService::shutdownNow);
+
+ for (int i = 0; i < THREADS; i++) {
+ executorService.execute(() -> {
+ try {
+ flagStart.await(10, TimeUnit.SECONDS);
+ for (int repeat = 0; repeat < BUMPS; repeat++) {
+ counter.increment(null, 2, 1L);
+ Transaction tx = new TransactionImpl(server.getStorageManager());
+ counter.increment(tx, 1, 1L);
+ tx.commit();
+ counter.increment(null, -1, -1L);
+ tx = new TransactionImpl(server.getStorageManager());
+ counter.increment(tx, -1, -1L);
+ tx.commit();
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ errors.incrementAndGet();
+ } finally {
+ done.countDown();
+ }
+ });
+ }
+
+ // it should take a couple seconds only
+ done.await(1, TimeUnit.MINUTES);
+
+ Wait.assertEquals((long)(BUMPS * THREADS), counter::getValue, 5000, 100);
+
+ server.stop();
+
+ server.start();
+
+ queue = server.locateQueue("A1");
+
+ final PageSubscriptionCounter counterAfterRestart = locateCounter(queue);
+ Wait.assertEquals((long)(BUMPS * THREADS), counterAfterRestart::getValue, 5000, 100);
- assertEquals(1, counter.getValue());
- assertEquals(1000, counter.getPersistentSize());
} finally {
sf.close();
session.close();
@@ -125,8 +201,8 @@ public class PagingCounterTest extends ActiveMQTestBase {
storage.waitOnOperations();
- assertEquals(i + 1, counter.getValue());
- assertEquals((i + 1) * 1000, counter.getPersistentSize());
+ Wait.assertEquals(i + 1, counter::getValue);
+ Wait.assertEquals((i + 1) * 1000, counter::getPersistentSize);
tx = new TransactionImpl(server.getStorageManager());
}
@@ -134,10 +210,8 @@ public class PagingCounterTest extends ActiveMQTestBase {
tx.commit();
- storage.waitOnOperations();
-
- assertEquals(2100, counter.getValue());
- assertEquals(2100 * 1000, counter.getPersistentSize());
+ Wait.assertEquals(2100, counter::getValue);
+ Wait.assertEquals(2100 * 1000, counter::getPersistentSize);
server.stop();
@@ -240,10 +314,8 @@ public class PagingCounterTest extends ActiveMQTestBase {
tx.commit();
- storage.waitOnOperations();
-
- assertEquals(1, counter.getValue());
- assertEquals(1000, counter.getPersistentSize());
+ Wait.assertEquals(1, counter::getValue);
+ Wait.assertEquals(1000, counter::getPersistentSize);
sl.close();
@@ -324,7 +396,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
storage.waitOnOperations();
- assertEquals(2000, counter.getValue());
+ Wait.assertEquals(2000, counter::getValue);
}
diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/M_and_M_FactoryTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/M_and_M_FactoryTest.java
index b400768eef..e0ccfab519 100644
--- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/M_and_M_FactoryTest.java
+++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/M_and_M_FactoryTest.java
@@ -249,9 +249,7 @@ public class M_and_M_FactoryTest extends SoakTestBase {
}, 45_000, 1_000);
expectedTotalSize += BATCH_SIZE * 2;
- Wait.assertEquals(expectedTotalSize, queueControl::getMessagesAdded);
- Wait.assertEquals(expectedTotalSize, () -> queueControl.getMessagesAcknowledged() + queueControl.getMessagesKilled());
retryNumber.incrementAndGet();
for (Process c : consumers) {