You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/12/11 03:24:56 UTC

[activemq-artemis] branch main updated: ARTEMIS-3604 Small test fix on ThresholdActorTest

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new af13d90  ARTEMIS-3604 Small test fix on ThresholdActorTest
af13d90 is described below

commit af13d90c5747489346965189972adecbb37f4e11
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Dec 10 22:24:07 2021 -0500

    ARTEMIS-3604 Small test fix on ThresholdActorTest
---
 .../artemis/utils/actors/ThresholdActorTest.java   | 36 +++++++++++++++++-----
 1 file changed, 28 insertions(+), 8 deletions(-)

diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java
index 5c715ec..01e5471 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java
@@ -17,9 +17,11 @@
 
 package org.apache.activemq.artemis.utils.actors;
 
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -119,7 +121,7 @@ public class ThresholdActorTest {
 
    public void block() {
       try {
-         if (!semaphore.tryAcquire()) {
+         if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
             errors.incrementAndGet();
             System.err.println("acquire failed");
          }
@@ -128,18 +130,36 @@ public class ThresholdActorTest {
       }
    }
 
+   public void unblock() {
+      semaphore.release();
+   }
+
    @Test
    public void testFlow() throws Exception {
-      final ExecutorService executorService = Executors.newSingleThreadExecutor();
+      final ExecutorService executorService = Executors.newFixedThreadPool(2);
 
       try {
-         ThresholdActor<Element> actor = new ThresholdActor<>(executorService, this::process, 20, (e) -> e.size, this::block, semaphore::release);
-
-         final int LAST_ELEMENT = 1000;
+         ThresholdActor<Element> actor = new ThresholdActor<>(executorService, this::process, 20, (e) -> e.size, this::block, this::unblock);
+
+         final int LAST_ELEMENT = 1111;
+
+         final CountDownLatch latchDone = new CountDownLatch(1);
+
+         executorService.execute(() -> {
+            for (int i = 0; i <= LAST_ELEMENT; i++) {
+               try {
+                  semaphore.acquire();
+                  semaphore.release();
+                  actor.act(new Element(i, i % 2 == 0 ? 20 : 1));
+               } catch (Exception e) {
+                  e.printStackTrace();
+                  errors.incrementAndGet();
+               }
+            }
+            latchDone.countDown();
+         });
 
-         for (int i = 0; i <= LAST_ELEMENT; i++) {
-            actor.act(new Element(i, i % 2 == 0 ? 20 : 1));
-         }
+         Assert.assertTrue(latchDone.await(10, TimeUnit.SECONDS));
 
          Wait.assertEquals(LAST_ELEMENT, lastProcessed::get);
          Assert.assertEquals(0, errors.get());