You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2022/09/19 02:58:32 UTC

[james-project] 07/07: JAMES-3816 ReactiveThrottlerTest: Remove unecessary subscribeOn call

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1d4af4a5bfaae9b1a312cdceaad82e0f0b3f7a51
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Sep 16 21:51:18 2022 +0700

    JAMES-3816 ReactiveThrottlerTest: Remove unecessary subscribeOn call
    
    Asynchronous scheduling is not needed and causes test instability
    
    Not needed
---
 .../imapserver/netty/ReactiveThrottlerTest.java    | 37 +++++++++++-----------
 1 file changed, 18 insertions(+), 19 deletions(-)

diff --git a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
index 3b767f61cc..d31dc142c6 100644
--- a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
+++ b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
@@ -32,7 +32,6 @@ import org.junit.jupiter.api.Test;
 import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
 
 class ReactiveThrottlerTest {
     @Test
@@ -55,9 +54,9 @@ class ReactiveThrottlerTest {
 
         // When I submit many tasks task
         AtomicBoolean executed = new AtomicBoolean(false);
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(200)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(200)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
-        Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).subscribeOn(Schedulers.boundedElastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(200)).then())).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(200)).then())).subscribe();
+        Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).subscribe();
 
         // Then that task is not executed straight away
         assertThat(executed.get()).isFalse();
@@ -70,9 +69,9 @@ class ReactiveThrottlerTest {
 
         // When I submit many tasks task
         AtomicBoolean executed = new AtomicBoolean(false);
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
-        Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).subscribeOn(Schedulers.boundedElastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribe();
+        Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).subscribe();
 
         // Then that task is eventually executed
         Awaitility.await().atMost(Duration.ofSeconds(10))
@@ -86,8 +85,8 @@ class ReactiveThrottlerTest {
 
         // When I await a submitted task execution and it is queued
         AtomicBoolean executed = new AtomicBoolean(false);
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribe();
         Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).block();
 
         // Then when done that task have been executed
@@ -101,10 +100,10 @@ class ReactiveThrottlerTest {
 
         // When I submit too many tasks task
         AtomicBoolean executed = new AtomicBoolean(false);
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribe();
 
         // Then extra tasks are rejected
         assertThatThrownBy(() -> Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).block())
@@ -122,9 +121,9 @@ class ReactiveThrottlerTest {
         AtomicBoolean executed = new AtomicBoolean(false);
         Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(100)).then()))
             .then(Mono.fromRunnable(() -> executed.getAndSet(true)))
-            .subscribeOn(Schedulers.boundedElastic())
+
             .subscribe();
-        Mono.from(testee.throttle(Mono.delay(Duration.ofSeconds(2)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofSeconds(2)).then())).subscribe();
 
         // Then extra tasks are rejected
         Thread.sleep(200);
@@ -148,10 +147,10 @@ class ReactiveThrottlerTest {
                 int i = concurrentTasks.getAndDecrement();
                 concurrentTasksCountSnapshots.add(i);
             }));
-        Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.boundedElastic()).subscribe();
-        Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.boundedElastic()).subscribe();
-        Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.boundedElastic()).subscribe();
-        Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.boundedElastic()).subscribe();
+        Mono.from(testee.throttle(operation)).subscribe();
+        Mono.from(testee.throttle(operation)).subscribe();
+        Mono.from(testee.throttle(operation)).subscribe();
+        Mono.from(testee.throttle(operation)).subscribe();
 
         // Then maximum parallelism is not exceeded
         Awaitility.await().untilAsserted(() -> assertThat(concurrentTasksCountSnapshots.size()).isEqualTo(8));


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org