You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/10/26 01:26:21 UTC

[activemq-artemis] branch main updated: ARTEMIS-4073 Page Counters can go off sync when multiple producers are used in the same address

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new d185735e55 ARTEMIS-4073 Page Counters can go off sync when multiple producers are used in the same address
d185735e55 is described below

commit d185735e550a79c6c58d7d272fa816fae18f11ea
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Tue Oct 25 12:05:27 2022 -0400

    ARTEMIS-4073 Page Counters can go off sync when multiple producers are used in the same address
---
 .../paging/cursor/PageSubscriptionCounter.java     |   6 --
 .../cursor/impl/PageSubscriptionCounterImpl.java   |  68 ++++++++-----
 .../integration/paging/PagingCounterTest.java      | 106 +++++++++++++++++----
 .../tests/soak/paging/M_and_M_FactoryTest.java     |   2 -
 4 files changed, 132 insertions(+), 50 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
index 33b744f606..75eedfa9ee 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
@@ -43,12 +43,6 @@ public interface PageSubscriptionCounter {
     */
    void processReload();
 
-   /**
-    * @param id
-    * @param variance
-    */
-   void addInc(long id, int variance, long size);
-
    // used when deleting the counter
    void delete() throws Exception;
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
index 0e7ab26dd0..07ca0373b6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
@@ -20,10 +20,12 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
+import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
 import org.apache.activemq.artemis.core.paging.impl.Page;
@@ -78,6 +80,8 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
 
    private LinkedList<PendingCounter> loadList;
 
+   private final Executor pageExecutor;
+
    public PageSubscriptionCounterImpl(final StorageManager storage,
                                       final PageSubscription subscription,
                                       final boolean persistent,
@@ -86,6 +90,12 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
       this.storage = storage;
       this.persistent = persistent;
       this.subscription = subscription;
+      if (subscription == null) {
+         this.pageExecutor = null;
+      } else {
+         this.pageExecutor = subscription.getPagingStore().getExecutor();
+         assert pageExecutor != null;
+      }
    }
 
    @Override
@@ -175,13 +185,23 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
    }
 
    @Override
-   public void increment(Transaction tx, int add, long size) throws Exception {
+   public synchronized void increment(Transaction tx, int add, long size) throws Exception {
       if (tx == null) {
          if (persistent) {
             long id = storage.storePageCounterInc(this.subscriptionID, add, size);
-            incrementProcessed(id, add, size);
+            storage.getContext().executeOnCompletion(new IOCallback() {
+               @Override
+               public void done() {
+                  process(id, add, size);
+               }
+
+               @Override
+               public void onError(int errorCode, String errorMessage) {
+
+               }
+            });
          } else {
-            incrementProcessed(-1, add, size);
+            process(-1, add, size);
          }
       } else {
          if (persistent) {
@@ -227,12 +247,12 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
       this.recordID = recordID1;
    }
 
-   public synchronized void incrementProcessed(long id, int add, long size) {
-      addInc(id, add, size);
-      if (incrementRecords.size() > FLUSH_COUNTER) {
-         this.subscription.getPagingStore().execute(this::cleanup);
+   private void process(long id, int add, long size) {
+      if (id >= 0 && pageExecutor != null) {
+         pageExecutor.execute(() -> doIncrement(id, add, size));
+      } else {
+         doIncrement(-1, add, size);
       }
-
    }
 
    @Override
@@ -295,8 +315,8 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
       }
    }
 
-   @Override
-   public synchronized void addInc(long id, int variance, long size) {
+   // you need to call this method from the executors when id > 0
+   private synchronized void doIncrement(long id, int variance, long size) {
       value.addAndGet(variance);
       this.persistentSize.addAndGet(size);
       if (variance > 0) {
@@ -307,6 +327,9 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
       }
       if (id >= 0) {
          incrementRecords.add(id);
+         if (incrementRecords.size() > FLUSH_COUNTER) {
+            this.cleanup();
+         }
       }
    }
 
@@ -320,21 +343,16 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
    /**
     * This method should always be called from a single threaded executor
     */
-   protected void cleanup() {
-      ArrayList<Long> deleteList;
-
-      long valueReplace;
-      long sizeReplace;
-      synchronized (this) {
-         if (incrementRecords.size() <= FLUSH_COUNTER) {
-            return;
-         }
-         valueReplace = value.get();
-         sizeReplace = persistentSize.get();
-         deleteList = new ArrayList<>(incrementRecords);
-         incrementRecords.clear();
+   protected synchronized void cleanup() {
+      if (incrementRecords.size() <= FLUSH_COUNTER) {
+         return;
       }
 
+      long valueReplace = value.get();
+      long sizeReplace = persistentSize.get();
+      ArrayList<Long> deleteList = new ArrayList<>(incrementRecords);
+      incrementRecords.clear();
+
       long newRecordID = -1;
 
       long txCleanup = storage.generateID();
@@ -352,7 +370,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
 
          if (logger.isTraceEnabled()) {
             logger.trace("Replacing page-counter record = {} by record = {} on subscriptionID = {} for queue = {}",
-               recordID, newRecordID, subscriptionID, subscription.getQueue().getName());
+                         recordID, newRecordID, subscriptionID, subscription.getQueue().getName());
          }
 
          storage.commit(txCleanup);
@@ -394,7 +412,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
       @Override
       public void afterCommit(Transaction tx) {
          for (ItemOper oper : operations) {
-            oper.counter.incrementProcessed(oper.id, oper.amount, oper.persistentSize);
+            oper.counter.process(oper.id, oper.amount, oper.persistentSize);
          }
       }
    }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
index 3dc6c10379..f68608e24a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
@@ -18,6 +18,14 @@ package org.apache.activemq.artemis.tests.integration.paging;
 
 import javax.transaction.xa.Xid;
 
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
@@ -37,14 +45,17 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class PagingCounterTest extends ActiveMQTestBase {
 
-
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
    private ActiveMQServer server;
 
@@ -86,15 +97,80 @@ public class PagingCounterTest extends ActiveMQTestBase {
 
          counter.increment(tx, 1, 1000);
 
-         assertEquals(0, counter.getValue());
-         assertEquals(0, counter.getPersistentSize());
+         Wait.assertEquals(0, counter::getValue);
+         Wait.assertEquals(0, counter::getPersistentSize);
 
          tx.commit();
 
-         storage.waitOnOperations();
+         Wait.assertEquals(1, counter::getValue);
+         Wait.assertEquals(1000, counter::getPersistentSize);
+      } finally {
+         sf.close();
+         session.close();
+      }
+   }
+
+   @Test
+   public void testMultiThreadUpdates() throws Exception {
+      ClientSessionFactory sf = createSessionFactory(sl);
+      ClientSession session = sf.createSession();
+      AtomicInteger errors = new AtomicInteger(0);
+
+      try {
+         server.addAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST));
+         Queue queue = server.createQueue(new QueueConfiguration(new SimpleString("A1")).setRoutingType(RoutingType.ANYCAST));
+
+         final PageSubscriptionCounter counter = locateCounter(queue);
+
+         final int THREADS = 10;
+
+         final CyclicBarrier flagStart = new CyclicBarrier(THREADS);
+         final CountDownLatch done = new CountDownLatch(THREADS);
+
+         final int BUMPS = 2000;
+
+         Assert.assertEquals(0, counter.getValue());
+
+         ExecutorService executorService = Executors.newFixedThreadPool(THREADS);
+         runAfter(executorService::shutdownNow);
+
+         for (int i = 0; i < THREADS; i++) {
+            executorService.execute(() -> {
+               try {
+                  flagStart.await(10, TimeUnit.SECONDS);
+                  for (int repeat = 0; repeat < BUMPS; repeat++) {
+                     counter.increment(null, 2, 1L);
+                     Transaction tx = new TransactionImpl(server.getStorageManager());
+                     counter.increment(tx, 1, 1L);
+                     tx.commit();
+                     counter.increment(null, -1, -1L);
+                     tx = new TransactionImpl(server.getStorageManager());
+                     counter.increment(tx, -1, -1L);
+                     tx.commit();
+                  }
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+                  errors.incrementAndGet();
+               } finally {
+                  done.countDown();
+               }
+            });
+         }
+
+         // it should take a couple seconds only
+         done.await(1, TimeUnit.MINUTES);
+
+         Wait.assertEquals((long)(BUMPS * THREADS), counter::getValue, 5000, 100);
+
+         server.stop();
+
+         server.start();
+
+         queue = server.locateQueue("A1");
+
+         final PageSubscriptionCounter counterAfterRestart = locateCounter(queue);
+         Wait.assertEquals((long)(BUMPS * THREADS), counterAfterRestart::getValue, 5000, 100);
 
-         assertEquals(1, counter.getValue());
-         assertEquals(1000, counter.getPersistentSize());
       } finally {
          sf.close();
          session.close();
@@ -125,8 +201,8 @@ public class PagingCounterTest extends ActiveMQTestBase {
 
                storage.waitOnOperations();
 
-               assertEquals(i + 1, counter.getValue());
-               assertEquals((i + 1) * 1000, counter.getPersistentSize());
+               Wait.assertEquals(i + 1, counter::getValue);
+               Wait.assertEquals((i + 1) * 1000,  counter::getPersistentSize);
 
                tx = new TransactionImpl(server.getStorageManager());
             }
@@ -134,10 +210,8 @@ public class PagingCounterTest extends ActiveMQTestBase {
 
          tx.commit();
 
-         storage.waitOnOperations();
-
-         assertEquals(2100, counter.getValue());
-         assertEquals(2100 * 1000, counter.getPersistentSize());
+         Wait.assertEquals(2100, counter::getValue);
+         Wait.assertEquals(2100 * 1000, counter::getPersistentSize);
 
          server.stop();
 
@@ -240,10 +314,8 @@ public class PagingCounterTest extends ActiveMQTestBase {
 
       tx.commit();
 
-      storage.waitOnOperations();
-
-      assertEquals(1, counter.getValue());
-      assertEquals(1000, counter.getPersistentSize());
+      Wait.assertEquals(1, counter::getValue);
+      Wait.assertEquals(1000, counter::getPersistentSize);
 
       sl.close();
 
@@ -324,7 +396,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
 
       storage.waitOnOperations();
 
-      assertEquals(2000, counter.getValue());
+      Wait.assertEquals(2000, counter::getValue);
 
    }
 
diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/M_and_M_FactoryTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/M_and_M_FactoryTest.java
index b400768eef..e0ccfab519 100644
--- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/M_and_M_FactoryTest.java
+++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/M_and_M_FactoryTest.java
@@ -249,9 +249,7 @@ public class M_and_M_FactoryTest extends SoakTestBase {
                }, 45_000, 1_000);
 
                expectedTotalSize += BATCH_SIZE * 2;
-               Wait.assertEquals(expectedTotalSize, queueControl::getMessagesAdded);
 
-               Wait.assertEquals(expectedTotalSize, () -> queueControl.getMessagesAcknowledged() + queueControl.getMessagesKilled());
                retryNumber.incrementAndGet();
 
                for (Process c : consumers) {