You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/03/14 19:26:09 UTC

[activemq-artemis] branch main updated: ARTEMIS-3591: stop paging checkMemory task executing twice

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 36dcb30  ARTEMIS-3591: stop paging checkMemory task executing twice
36dcb30 is described below

commit 36dcb30cda254d1ea7941bdc128836aac592a404
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Mon Mar 14 17:15:16 2022 +0000

    ARTEMIS-3591: stop paging checkMemory task executing twice
---
 .../artemis/core/paging/impl/PagingStoreImpl.java  |  6 ++-
 tests/unit-tests/pom.xml                           |  5 ++
 .../unit/core/paging/impl/PagingStoreImplTest.java | 58 ++++++++++++++++++++++
 3 files changed, 67 insertions(+), 2 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 1b9b81f..e785889 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -717,7 +717,8 @@ public class PagingStoreImpl implements PagingStore {
                runWhenBlocking.run();
             }
 
-            onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));
+            AtomicRunnable atomicRunWhenAvailable = AtomicRunnable.checkAtomic(runWhenAvailable);
+            onMemoryFreedRunnables.add(atomicRunWhenAvailable);
 
             // We check again to avoid a race condition where the size can come down just after the element
             // has been added, but the check to execute was done before the element was added
@@ -725,7 +726,7 @@ public class PagingStoreImpl implements PagingStore {
             // MUCH better performance in a highly concurrent environment
             if (!pagingManager.isGlobalFull() && (sizeInBytes.get() < maxSize || maxSize < 0)) {
                // run it now
-               runWhenAvailable.run();
+               atomicRunWhenAvailable.run();
             } else {
                if (usingGlobalMaxSize || pagingManager.isDiskFull()) {
                   pagingManager.addBlockedStore(this);
@@ -740,6 +741,7 @@ public class PagingStoreImpl implements PagingStore {
                   blocking = true;
                }
             }
+
             return true;
          }
       }
diff --git a/tests/unit-tests/pom.xml b/tests/unit-tests/pom.xml
index 405eb14..67b5ff4 100644
--- a/tests/unit-tests/pom.xml
+++ b/tests/unit-tests/pom.xml
@@ -170,6 +170,11 @@
          <version>${netty-tcnative-version}</version>
          <scope>test</scope>
       </dependency>
+      <dependency>
+        <groupId>org.mockito</groupId>
+        <artifactId>mockito-core</artifactId>
+        <scope>test</scope>
+      </dependency>
    </dependencies>
 
    <build>
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
index 7728687..7188c1c 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
@@ -72,6 +72,7 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import static org.apache.activemq.artemis.logs.AssertionLoggerHandler.findText;
 
@@ -1194,4 +1195,61 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
       public void freeDirectuffer(final ByteBuffer buffer) {
       }
    }
+
+   private static final class CountingRunnable implements Runnable {
+      final AtomicInteger calls = new AtomicInteger();
+
+      @Override
+      public void run() {
+         calls.incrementAndGet();
+      }
+
+      public int getCount() {
+         return calls.get();
+      }
+   }
+
+   @Test(timeout = 10000)
+   public void testCheckExecutionIsNotRepeated() throws Exception {
+      SequentialFileFactory factory = new FakeSequentialFileFactory();
+
+      PagingStoreFactory storeFactory = new FakeStoreFactory(factory);
+
+      PagingManager mockManager = Mockito.mock(PagingManager.class);
+
+      ArtemisExecutor sameThreadExecutor = Runnable::run;
+      PagingStoreImpl store = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100,
+                                                  mockManager, createStorageManagerMock(), factory, storeFactory,
+                                                  PagingStoreImplTest.destinationTestName,
+                                                  new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK),
+                                                  sameThreadExecutor, true);
+
+      store.start();
+      try {
+         store.applySetting(new AddressSettings().setMaxSizeBytes(1000).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK));
+
+         Mockito.when(mockManager.addSize(Mockito.anyInt())).thenReturn(mockManager);
+         store.addSize(100);
+
+         // Do an initial check
+         final CountingRunnable trackMemoryCheck1 = new CountingRunnable();
+         assertEquals(0, trackMemoryCheck1.getCount());
+         store.checkMemory(trackMemoryCheck1);
+         assertEquals(1, trackMemoryCheck1.getCount());
+
+         // Do another check, this time indicate the disk is full during the first couple
+         // requests, making the task initially be retained for later but then executed.
+         final CountingRunnable trackMemoryCheck2 = new CountingRunnable();
+         Mockito.when(mockManager.isDiskFull()).thenReturn(true, true, false);
+         assertEquals(0, trackMemoryCheck2.getCount());
+         store.checkMemory(trackMemoryCheck2);
+         assertEquals(1, trackMemoryCheck2.getCount());
+
+         // Now run the released memory checks. The task should NOT execute again, verify it doesnt.
+         store.checkReleasedMemory();
+         assertEquals(1, trackMemoryCheck2.getCount());
+      } finally {
+         store.stop();
+      }
+   }
 }