You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2022/09/19 02:58:25 UTC

[james-project] branch master updated (066e2c98e7 -> 1d4af4a5bf)

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


    from 066e2c98e7 JAMES-2656 - Add initial JPAMailRepository implementation (#1176)
     new 7897213d55 JAMES-3816 throttler for reactive workloads
     new 058579a67c JAMES-3816 Plug ReactiveThrottler into IMAPServer
     new 72c5ebbb3b JAMES-3816 Allow turning off ReactiveThrottler
     new f388f029c8 JAMES-3816 Documentation for IMAP throttling
     new da71b92cf4 JAMES-3816 IMAP throttler should not await nested tasks
     new 5b6b9accf4 JAMES-3816 Start nested IMAP requests without switching threads
     new 1d4af4a5bf JAMES-3816 ReactiveThrottlerTest: Remove unecessary subscribeOn call

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/james/imap/api/ImapConfiguration.java   |  43 +++++-
 .../docs/modules/ROOT/pages/configure/imap.adoc    |   8 ++
 .../apache/james/imapserver/netty/IMAPServer.java  |  18 ++-
 .../james/imapserver/netty/IMAPServerFactory.java  |   7 +-
 .../netty/ImapChannelUpstreamHandler.java          |  84 +++++------
 .../james/imapserver/netty/ReactiveThrottler.java  |  89 ++++++++++++
 .../META-INF/spring/imapserver-context.xml         |   1 +
 .../netty/IMAPServerConfigurationTest.java         |   6 +
 .../james/imapserver/netty/IMAPServerTest.java     |   4 +-
 .../imapserver/netty/ReactiveThrottlerTest.java    | 160 +++++++++++++++++++++
 src/site/xdoc/server/config-imap4.xml              |   6 +
 11 files changed, 373 insertions(+), 53 deletions(-)
 create mode 100644 server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
 create mode 100644 server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 07/07: JAMES-3816 ReactiveThrottlerTest: Remove unecessary subscribeOn call

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1d4af4a5bfaae9b1a312cdceaad82e0f0b3f7a51
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Sep 16 21:51:18 2022 +0700

    JAMES-3816 ReactiveThrottlerTest: Remove unecessary subscribeOn call
    
    Asynchronous scheduling is not needed and causes test instability
    
    Not needed
---
 .../imapserver/netty/ReactiveThrottlerTest.java    | 37 +++++++++++-----------
 1 file changed, 18 insertions(+), 19 deletions(-)

diff --git a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
index 3b767f61cc..d31dc142c6 100644
--- a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
+++ b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
@@ -32,7 +32,6 @@ import org.junit.jupiter.api.Test;
 import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
 
 class ReactiveThrottlerTest {
     @Test
@@ -55,9 +54,9 @@ class ReactiveThrottlerTest {
 
         // When I submit many tasks task
         AtomicBoolean executed = new AtomicBoolean(false);
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(200)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(200)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
-        Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).subscribeOn(Schedulers.boundedElastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(200)).then())).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(200)).then())).subscribe();
+        Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).subscribe();
 
         // Then that task is not executed straight away
         assertThat(executed.get()).isFalse();
@@ -70,9 +69,9 @@ class ReactiveThrottlerTest {
 
         // When I submit many tasks task
         AtomicBoolean executed = new AtomicBoolean(false);
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
-        Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).subscribeOn(Schedulers.boundedElastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribe();
+        Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).subscribe();
 
         // Then that task is eventually executed
         Awaitility.await().atMost(Duration.ofSeconds(10))
@@ -86,8 +85,8 @@ class ReactiveThrottlerTest {
 
         // When I await a submitted task execution and it is queued
         AtomicBoolean executed = new AtomicBoolean(false);
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribe();
         Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).block();
 
         // Then when done that task have been executed
@@ -101,10 +100,10 @@ class ReactiveThrottlerTest {
 
         // When I submit too many tasks task
         AtomicBoolean executed = new AtomicBoolean(false);
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribe();
 
         // Then extra tasks are rejected
         assertThatThrownBy(() -> Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).block())
@@ -122,9 +121,9 @@ class ReactiveThrottlerTest {
         AtomicBoolean executed = new AtomicBoolean(false);
         Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(100)).then()))
             .then(Mono.fromRunnable(() -> executed.getAndSet(true)))
-            .subscribeOn(Schedulers.boundedElastic())
+
             .subscribe();
-        Mono.from(testee.throttle(Mono.delay(Duration.ofSeconds(2)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofSeconds(2)).then())).subscribe();
 
         // Then extra tasks are rejected
         Thread.sleep(200);
@@ -148,10 +147,10 @@ class ReactiveThrottlerTest {
                 int i = concurrentTasks.getAndDecrement();
                 concurrentTasksCountSnapshots.add(i);
             }));
-        Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.boundedElastic()).subscribe();
-        Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.boundedElastic()).subscribe();
-        Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.boundedElastic()).subscribe();
-        Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.boundedElastic()).subscribe();
+        Mono.from(testee.throttle(operation)).subscribe();
+        Mono.from(testee.throttle(operation)).subscribe();
+        Mono.from(testee.throttle(operation)).subscribe();
+        Mono.from(testee.throttle(operation)).subscribe();
 
         // Then maximum parallelism is not exceeded
         Awaitility.await().untilAsserted(() -> assertThat(concurrentTasksCountSnapshots.size()).isEqualTo(8));


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 04/07: JAMES-3816 Documentation for IMAP throttling

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f388f029c8aeeab93b67c45e8909caf4dbaa0b0e
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Sep 9 15:45:39 2022 +0700

    JAMES-3816 Documentation for IMAP throttling
---
 .../distributed-app/docs/modules/ROOT/pages/configure/imap.adoc   | 8 ++++++++
 src/site/xdoc/server/config-imap4.xml                             | 6 ++++++
 2 files changed, 14 insertions(+)

diff --git a/server/apps/distributed-app/docs/modules/ROOT/pages/configure/imap.adoc b/server/apps/distributed-app/docs/modules/ROOT/pages/configure/imap.adoc
index 75c22eb21e..ec33b5a7c5 100644
--- a/server/apps/distributed-app/docs/modules/ROOT/pages/configure/imap.adoc
+++ b/server/apps/distributed-app/docs/modules/ROOT/pages/configure/imap.adoc
@@ -107,6 +107,14 @@ will use the specified value.
 | connectionLimitPerIP
 | Set the maximum simultaneous incoming connections per IP for this service
 
+| concurrentRequests
+| Maximum number of IMAP requests executed simultaneously. Past that limit requests are queued. Defaults to 20.
+Negative values deactivate this feature, leading to unbounded concurrency.
+
+| maxQueueSize
+| Upper bound to the IMAP throttler queue. Upon burst, requests that cannot be queued are rejected and not executed.
+Integer, defaults to 4096, must be positive, 0 means no queue.
+
 | proxyRequired
 | Enables proxy support for this service for incoming connections. HAProxy's protocol
 (https://www.haproxy.org/download/2.7/doc/proxy-protocol.txt) is used and might be compatible
diff --git a/src/site/xdoc/server/config-imap4.xml b/src/site/xdoc/server/config-imap4.xml
index 0da6435b53..405f6feae7 100644
--- a/src/site/xdoc/server/config-imap4.xml
+++ b/src/site/xdoc/server/config-imap4.xml
@@ -80,6 +80,12 @@
         <dd>Set the maximum simultaneous incoming connections for this service</dd>
         <dt><strong>handler.connectionLimitPerIP</strong></dt>
         <dd>Set the maximum simultaneous incoming connections per IP for this service</dd>
+        <dt><strong>concurrentRequests</strong></dt>
+        <dd>Maximum number of IMAP requests executed simultaneously. Past that limit requests are queued. Defaults to 20.
+            Negative values deactivate this feature, leading to unbounded concurrency.</dd>
+        <dt><strong>maxQueueSize</strong></dt>
+        <dd>Upper bound to the IMAP throttler queue. Upon burst, requests that cannot be queued are rejected and not executed.
+            Integer, defaults to 4096, must be positive, 0 means no queue.</dd>
         <dt><strong>handler.proxyRequired</strong></dt>
         <dd>
           Enables proxy support for this service for incoming connections. HAProxy's protocol


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 01/07: JAMES-3816 throttler for reactive workloads

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 7897213d55a8fcfa098e83e3386b1744afd52fb6
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Sep 5 14:27:41 2022 +0700

    JAMES-3816 throttler for reactive workloads
---
 .../james/imapserver/netty/ReactiveThrottler.java  |  86 ++++++++++++++
 .../imapserver/netty/ReactiveThrottlerTest.java    | 124 +++++++++++++++++++++
 2 files changed, 210 insertions(+)

diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
new file mode 100644
index 0000000000..5e2b607c01
--- /dev/null
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
@@ -0,0 +1,86 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.imapserver.netty;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.james.metrics.api.GaugeRegistry;
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
+
+public class ReactiveThrottler {
+    public static class RejectedException extends RuntimeException {
+        public RejectedException(String message) {
+            super(message);
+        }
+    }
+
+    private final int maxConcurrentRequests;
+    private final int maxQueueSize;
+    // In flight + executing
+    private final AtomicInteger concurrentRequests = new AtomicInteger(0);
+    private final Queue<Publisher<Void>> queue = new ConcurrentLinkedQueue<>();
+
+    public ReactiveThrottler(GaugeRegistry gaugeRegistry, int maxConcurrentRequests, int maxQueueSize) {
+        gaugeRegistry.register("imap.request.queue.size", () -> Math.max(concurrentRequests.get() - maxConcurrentRequests, 0));
+
+        this.maxConcurrentRequests = maxConcurrentRequests;
+        this.maxQueueSize = maxQueueSize;
+    }
+
+    public Publisher<Void> throttle(Publisher<Void> task) {
+        int requestNumber = concurrentRequests.incrementAndGet();
+
+        if (requestNumber <= maxConcurrentRequests) {
+            // We have capacity for one more concurrent request
+            return Mono.from(task)
+                .then(Mono.defer(this::onRequestDone));
+        } else if (requestNumber <= maxQueueSize + maxConcurrentRequests) {
+            // Queue the request for later
+            Sinks.One<Void> one = Sinks.one();
+            queue.add(Mono.from(task)
+                .then(Mono.fromRunnable(() -> one.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST))));
+            // Let the caller await task completion
+            return one.asMono();
+        } else {
+            concurrentRequests.decrementAndGet();
+
+            return Mono.error(new RejectedException(
+                String.format(
+                    "The IMAP server has reached its maximum capacity "
+                        + "(concurrent requests: %d, queue size: %d)",
+                    maxConcurrentRequests, maxQueueSize)));
+        }
+    }
+
+    private Mono<Void> onRequestDone() {
+        concurrentRequests.decrementAndGet();
+        Publisher<Void> throttled = queue.poll();
+        if (throttled != null) {
+            return Mono.from(throttled)
+                .then(Mono.defer(this::onRequestDone));
+        }
+        return Mono.empty();
+    }
+}
diff --git a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
new file mode 100644
index 0000000000..e8d626c2dc
--- /dev/null
+++ b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
@@ -0,0 +1,124 @@
+package org.apache.james.imapserver.netty;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.time.Duration;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.james.metrics.api.NoopGaugeRegistry;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+class ReactiveThrottlerTest {
+    @Test
+    void throttleShouldExecuteSubmittedTasks() {
+        // Given a throttler
+        ReactiveThrottler testee = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
+
+        // When I submit a task
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(Mono.fromRunnable(() -> executed.getAndSet(true))))).block();
+
+        // Then that task is executed
+        assertThat(executed.get()).isTrue();
+    }
+
+    @Test
+    void throttleShouldNotExecuteQueuedTasksLogicRightAway() {
+        // Given a throttler
+        ReactiveThrottler testee = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
+
+        // When I submit many tasks task
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(200)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(200)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).subscribeOn(Schedulers.elastic()).subscribe();
+
+        // Then that task is not executed straight away
+        assertThat(executed.get()).isFalse();
+    }
+
+    @Test
+    void throttleShouldEventuallyExecuteQueuedTasks() {
+        // Given a throttler
+        ReactiveThrottler testee = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
+
+        // When I submit many tasks task
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).subscribeOn(Schedulers.elastic()).subscribe();
+
+        // Then that task is eventually executed
+        Awaitility.await().atMost(Duration.ofSeconds(10))
+            .untilAsserted(() -> assertThat(executed.get()).isTrue());
+    }
+
+    @Test
+    void throttleShouldCompleteWhenSubmittedTaskCompletes() {
+        // Given a throttler
+        ReactiveThrottler testee = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
+
+        // When I await a submitted task execution and it is queued
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).block();
+
+        // Then when done that task have been executed
+        assertThat(executed.get()).isTrue();
+    }
+
+    @Test
+    void throttleShouldRejectTasksWhenTheQueueIsFull() {
+        // Given a throttler
+        ReactiveThrottler testee = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
+
+        // When I submit too many tasks task
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+
+        // Then extra tasks are rejected
+        assertThatThrownBy(() -> Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).block())
+            .isInstanceOf(ReactiveThrottler.RejectedException.class);
+        // And the task is not executed
+        assertThat(executed.get()).isFalse();
+    }
+
+    @Test
+    void throttleShouldNotExceedItsConcurrency() {
+        // Given a throttler
+        ReactiveThrottler testee = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
+
+        // When I submit many tasks task
+        AtomicInteger concurrentTasks = new AtomicInteger(0);
+        ConcurrentLinkedDeque<Integer> concurrentTasksCountSnapshots = new ConcurrentLinkedDeque<>();
+
+        Mono<Void> operation = Mono.fromRunnable(() -> {
+            int i = concurrentTasks.incrementAndGet();
+            concurrentTasksCountSnapshots.add(i);
+        }).then(Mono.delay(Duration.ofMillis(50)))
+            .then(Mono.fromRunnable(() -> {
+                int i = concurrentTasks.getAndDecrement();
+                concurrentTasksCountSnapshots.add(i);
+            }));
+        Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.elastic()).subscribe();
+
+        // Then maximum parallelism is not exceeded
+        Awaitility.await().untilAsserted(() -> assertThat(concurrentTasksCountSnapshots.size()).isEqualTo(8));
+        assertThat(concurrentTasksCountSnapshots)
+            .allSatisfy(i -> assertThat(i).isBetween(0, 2));
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 05/07: JAMES-3816 IMAP throttler should not await nested tasks

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit da71b92cf4e2452c42596c695adcd1c99773b9fe
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Sep 12 19:32:38 2022 +0700

    JAMES-3816 IMAP throttler should not await nested tasks
---
 .../james/imapserver/netty/ReactiveThrottler.java  | 12 ++--
 .../imapserver/netty/ReactiveThrottlerTest.java    | 69 +++++++++++++++++-----
 2 files changed, 60 insertions(+), 21 deletions(-)

diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
index 0d9028d556..2a53e4a58b 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
@@ -28,6 +28,7 @@ import org.reactivestreams.Publisher;
 
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Sinks;
+import reactor.core.scheduler.Schedulers;
 
 public class ReactiveThrottler {
     public static class RejectedException extends RuntimeException {
@@ -58,7 +59,7 @@ public class ReactiveThrottler {
         if (requestNumber <= maxConcurrentRequests) {
             // We have capacity for one more concurrent request
             return Mono.from(task)
-                .then(Mono.defer(this::onRequestDone));
+                .doFinally(any -> onRequestDone());
         } else if (requestNumber <= maxQueueSize + maxConcurrentRequests) {
             // Queue the request for later
             Sinks.One<Void> one = Sinks.one();
@@ -77,13 +78,14 @@ public class ReactiveThrottler {
         }
     }
 
-    private Mono<Void> onRequestDone() {
+    private void onRequestDone() {
         concurrentRequests.decrementAndGet();
         Publisher<Void> throttled = queue.poll();
         if (throttled != null) {
-            return Mono.from(throttled)
-                .then(Mono.defer(this::onRequestDone));
+            Mono.from(throttled)
+                .doFinally(any -> onRequestDone())
+                .subscribeOn(Schedulers.parallel())
+                .subscribe();
         }
-        return Mono.empty();
     }
 }
diff --git a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
index e8d626c2dc..3b767f61cc 100644
--- a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
+++ b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
@@ -1,3 +1,22 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
 package org.apache.james.imapserver.netty;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -36,9 +55,9 @@ class ReactiveThrottlerTest {
 
         // When I submit many tasks task
         AtomicBoolean executed = new AtomicBoolean(false);
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(200)).then())).subscribeOn(Schedulers.elastic()).subscribe();
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(200)).then())).subscribeOn(Schedulers.elastic()).subscribe();
-        Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(200)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(200)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
+        Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).subscribeOn(Schedulers.boundedElastic()).subscribe();
 
         // Then that task is not executed straight away
         assertThat(executed.get()).isFalse();
@@ -51,9 +70,9 @@ class ReactiveThrottlerTest {
 
         // When I submit many tasks task
         AtomicBoolean executed = new AtomicBoolean(false);
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
-        Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
+        Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).subscribeOn(Schedulers.boundedElastic()).subscribe();
 
         // Then that task is eventually executed
         Awaitility.await().atMost(Duration.ofSeconds(10))
@@ -67,8 +86,8 @@ class ReactiveThrottlerTest {
 
         // When I await a submitted task execution and it is queued
         AtomicBoolean executed = new AtomicBoolean(false);
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
         Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).block();
 
         // Then when done that task have been executed
@@ -82,10 +101,10 @@ class ReactiveThrottlerTest {
 
         // When I submit too many tasks task
         AtomicBoolean executed = new AtomicBoolean(false);
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
-        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
 
         // Then extra tasks are rejected
         assertThatThrownBy(() -> Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).block())
@@ -94,6 +113,24 @@ class ReactiveThrottlerTest {
         assertThat(executed.get()).isFalse();
     }
 
+    @Test
+    void throttleShouldNotAwaitOtherTasks() throws Exception {
+        // Given a throttler
+        ReactiveThrottler testee = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
+
+        // When I submit a short and a long task
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(100)).then()))
+            .then(Mono.fromRunnable(() -> executed.getAndSet(true)))
+            .subscribeOn(Schedulers.boundedElastic())
+            .subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofSeconds(2)).then())).subscribeOn(Schedulers.boundedElastic()).subscribe();
+
+        // Then extra tasks are rejected
+        Thread.sleep(200);
+        assertThat(executed.get()).isTrue();
+    }
+
     @Test
     void throttleShouldNotExceedItsConcurrency() {
         // Given a throttler
@@ -111,10 +148,10 @@ class ReactiveThrottlerTest {
                 int i = concurrentTasks.getAndDecrement();
                 concurrentTasksCountSnapshots.add(i);
             }));
-        Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.elastic()).subscribe();
-        Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.elastic()).subscribe();
-        Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.elastic()).subscribe();
-        Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.boundedElastic()).subscribe();
+        Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.boundedElastic()).subscribe();
+        Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.boundedElastic()).subscribe();
+        Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.boundedElastic()).subscribe();
 
         // Then maximum parallelism is not exceeded
         Awaitility.await().untilAsserted(() -> assertThat(concurrentTasksCountSnapshots.size()).isEqualTo(8));


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 02/07: JAMES-3816 Plug ReactiveThrottler into IMAPServer

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 058579a67c2b849c6f3024c123a832ae9bfab60a
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Sep 9 15:23:47 2022 +0700

    JAMES-3816 Plug ReactiveThrottler into IMAPServer
---
 .../apache/james/imap/api/ImapConfiguration.java   | 44 +++++++++++-
 .../apache/james/imapserver/netty/IMAPServer.java  | 18 +++--
 .../james/imapserver/netty/IMAPServerFactory.java  |  7 +-
 .../netty/ImapChannelUpstreamHandler.java          | 84 +++++++++++-----------
 .../james/imapserver/netty/ReactiveThrottler.java  |  2 +-
 .../META-INF/spring/imapserver-context.xml         |  1 +
 .../netty/IMAPServerConfigurationTest.java         |  6 ++
 .../james/imapserver/netty/IMAPServerTest.java     |  4 +-
 8 files changed, 112 insertions(+), 54 deletions(-)

diff --git a/protocols/imap/src/main/java/org/apache/james/imap/api/ImapConfiguration.java b/protocols/imap/src/main/java/org/apache/james/imap/api/ImapConfiguration.java
index 4db594d3d7..21f8928774 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/api/ImapConfiguration.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/api/ImapConfiguration.java
@@ -35,12 +35,15 @@ public class ImapConfiguration {
     public static final boolean DEFAULT_ENABLE_IDLE = true;
     public static final long DEFAULT_HEARTBEAT_INTERVAL_IN_SECONDS = 2 * 60;
     public static final TimeUnit DEFAULT_HEARTBEAT_INTERVAL_UNIT = TimeUnit.SECONDS;
+    public static final int DEFAULT_CONCURRENT_REQUESTS = 128;
+    public static final int DEFAULT_QUEUE_SIZE = 4096;
 
     public static Builder builder() {
         return new Builder();
     }
 
     public static class Builder {
+
         private static boolean noBlankString(String disableCap) {
             return !StringUtils.isBlank(disableCap);
         }
@@ -48,12 +51,16 @@ public class ImapConfiguration {
         private static final boolean DEFAULT_CONDSTORE_DISABLE = false;
 
         private Optional<Long> idleTimeInterval;
+        private Optional<Integer> concurrentRequests;
+        private Optional<Integer> maxQueueSize;
         private Optional<TimeUnit> idleTimeIntervalUnit;
         private Optional<Boolean> enableIdle;
         private ImmutableSet<String> disabledCaps;
         private Optional<Boolean> isCondstoreEnable;
 
         private Builder() {
+            this.concurrentRequests = Optional.empty();
+            this.maxQueueSize = Optional.empty();
             this.idleTimeInterval = Optional.empty();
             this.idleTimeIntervalUnit = Optional.empty();
             this.enableIdle = Optional.empty();
@@ -62,11 +69,23 @@ public class ImapConfiguration {
         }
 
         public Builder idleTimeInterval(long idleTimeInterval) {
-            Preconditions.checkArgument(idleTimeInterval > 0, "The interval time should not be rezo or negative");
+            Preconditions.checkArgument(idleTimeInterval > 0, "The interval time should not be zero or negative");
             this.idleTimeInterval = Optional.of(idleTimeInterval);
             return this;
         }
 
+        public Builder concurrentRequests(int concurrentRequests) {
+            Preconditions.checkArgument(concurrentRequests > 0, "concurrentRequests should not be zero or negative");
+            this.concurrentRequests = Optional.of(concurrentRequests);
+            return this;
+        }
+
+        public Builder maxQueueSize(int maxQueueSize) {
+            Preconditions.checkArgument(maxQueueSize > 0, "maxQueueSize should not be negative");
+            this.maxQueueSize = Optional.of(maxQueueSize);
+            return this;
+        }
+
         public Builder idleTimeIntervalUnit(TimeUnit idleTimeIntervalUnit) {
             this.idleTimeIntervalUnit = Optional.of(idleTimeIntervalUnit);
             return this;
@@ -106,6 +125,8 @@ public class ImapConfiguration {
             return new ImapConfiguration(
                     enableIdle.orElse(DEFAULT_ENABLE_IDLE),
                     idleTimeInterval.orElse(DEFAULT_HEARTBEAT_INTERVAL_IN_SECONDS),
+                    concurrentRequests.orElse(DEFAULT_CONCURRENT_REQUESTS),
+                    maxQueueSize.orElse(DEFAULT_QUEUE_SIZE),
                     idleTimeIntervalUnit.orElse(DEFAULT_HEARTBEAT_INTERVAL_UNIT),
                     normalizeDisableCaps,
                     isCondstoreEnable.orElse(DEFAULT_CONDSTORE_DISABLE));
@@ -113,19 +134,31 @@ public class ImapConfiguration {
     }
 
     private final long idleTimeInterval;
+    private final int concurrentRequests;
+    private final int maxQueueSize;
     private final TimeUnit idleTimeIntervalUnit;
     private final ImmutableSet<Capability> disabledCaps;
     private final boolean enableIdle;
     private final boolean isCondstoreEnable;
 
-    private ImapConfiguration(boolean enableIdle, long idleTimeInterval, TimeUnit idleTimeIntervalUnit, ImmutableSet<Capability> disabledCaps, boolean isCondstoreEnable) {
+    private ImapConfiguration(boolean enableIdle, long idleTimeInterval, int concurrentRequests, int maxQueueSize, TimeUnit idleTimeIntervalUnit, ImmutableSet<Capability> disabledCaps, boolean isCondstoreEnable) {
         this.enableIdle = enableIdle;
         this.idleTimeInterval = idleTimeInterval;
+        this.concurrentRequests = concurrentRequests;
+        this.maxQueueSize = maxQueueSize;
         this.idleTimeIntervalUnit = idleTimeIntervalUnit;
         this.disabledCaps = disabledCaps;
         this.isCondstoreEnable = isCondstoreEnable;
     }
 
+    public int getConcurrentRequests() {
+        return concurrentRequests;
+    }
+
+    public int getMaxQueueSize() {
+        return maxQueueSize;
+    }
+
     public long getIdleTimeInterval() {
         return idleTimeInterval;
     }
@@ -157,6 +190,8 @@ public class ImapConfiguration {
             return Objects.equal(that.isEnableIdle(), enableIdle)
                 && Objects.equal(that.getIdleTimeInterval(), idleTimeInterval)
                 && Objects.equal(that.getIdleTimeIntervalUnit(), idleTimeIntervalUnit)
+                && Objects.equal(that.getConcurrentRequests(), concurrentRequests)
+                && Objects.equal(that.getMaxQueueSize(), maxQueueSize)
                 && Objects.equal(that.getDisabledCaps(), disabledCaps)
                 && Objects.equal(that.isCondstoreEnable(), isCondstoreEnable);
         }
@@ -165,7 +200,8 @@ public class ImapConfiguration {
 
     @Override
     public final int hashCode() {
-        return Objects.hashCode(enableIdle, idleTimeInterval, idleTimeIntervalUnit, disabledCaps, isCondstoreEnable);
+        return Objects.hashCode(enableIdle, idleTimeInterval, idleTimeIntervalUnit, disabledCaps, isCondstoreEnable,
+            concurrentRequests, maxQueueSize);
     }
 
     @Override
@@ -176,6 +212,8 @@ public class ImapConfiguration {
                 .add("idleTimeIntervalUnit", idleTimeIntervalUnit)
                 .add("disabledCaps", disabledCaps)
                 .add("isCondstoreEnable", isCondstoreEnable)
+                .add("concurrentRequests", concurrentRequests)
+                .add("maxQueueSize", maxQueueSize)
                 .toString();
     }
 }
diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
index 6c5fd0ab4a..25b2ea06db 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
@@ -31,6 +31,7 @@ import org.apache.james.imap.api.ImapConstants;
 import org.apache.james.imap.api.process.ImapProcessor;
 import org.apache.james.imap.decode.ImapDecoder;
 import org.apache.james.imap.encode.ImapEncoder;
+import org.apache.james.metrics.api.GaugeRegistry;
 import org.apache.james.protocols.api.OidcSASLConfiguration;
 import org.apache.james.protocols.lib.netty.AbstractConfigurableAsyncServer;
 import org.apache.james.protocols.netty.AbstractChannelPipelineFactory;
@@ -121,11 +122,16 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC
     private static final String SOFTWARE_TYPE = "JAMES " + VERSION + " Server ";
     private static final String DEFAULT_TIME_UNIT = "SECONDS";
     private static final String CAPABILITY_SEPARATOR = "|";
+    public static final int DEFAULT_MAX_LINE_LENGTH = 65536; // Use a big default
+    public static final Size DEFAULT_IN_MEMORY_SIZE_LIMIT = Size.of(10L, Size.Unit.M); // Use 10MB as default
+    public static final int DEFAULT_TIMEOUT = 30 * 60; // default timeout is 30 minutes
+    public static final int DEFAULT_LITERAL_SIZE_LIMIT = 0;
 
     private final ImapProcessor processor;
     private final ImapEncoder encoder;
     private final ImapDecoder decoder;
     private final ImapMetrics imapMetrics;
+    private final GaugeRegistry gaugeRegistry;
 
     private String hello;
     private boolean compress;
@@ -138,17 +144,15 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC
     private Optional<ConnectionPerIpLimitUpstreamHandler> connectionPerIpLimitUpstreamHandler = Optional.empty();
     private boolean ignoreIDLEUponProcessing;
     private Duration heartbeatInterval;
+    private ReactiveThrottler reactiveThrottler;
 
-    public static final int DEFAULT_MAX_LINE_LENGTH = 65536; // Use a big default
-    public static final Size DEFAULT_IN_MEMORY_SIZE_LIMIT = Size.of(10L, Size.Unit.M); // Use 10MB as default
-    public static final int DEFAULT_TIMEOUT = 30 * 60; // default timeout is 30 minutes
-    public static final int DEFAULT_LITERAL_SIZE_LIMIT = 0;
 
-    public IMAPServer(ImapDecoder decoder, ImapEncoder encoder, ImapProcessor processor, ImapMetrics imapMetrics) {
+    public IMAPServer(ImapDecoder decoder, ImapEncoder encoder, ImapProcessor processor, ImapMetrics imapMetrics, GaugeRegistry gaugeRegistry) {
         this.processor = processor;
         this.encoder = encoder;
         this.decoder = decoder;
         this.imapMetrics = imapMetrics;
+        this.gaugeRegistry = gaugeRegistry;
     }
 
     @Override
@@ -179,6 +183,7 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC
         ignoreIDLEUponProcessing = configuration.getBoolean("ignoreIDLEUponProcessing", true);
         ImapConfiguration imapConfiguration = getImapConfiguration(configuration);
         heartbeatInterval = imapConfiguration.idleTimeIntervalAsDuration();
+        reactiveThrottler = new ReactiveThrottler(gaugeRegistry, imapConfiguration.getConcurrentRequests(), imapConfiguration.getMaxQueueSize());
         processor.configure(imapConfiguration);
     }
 
@@ -190,6 +195,8 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC
                 .idleTimeInterval(configuration.getLong("idleTimeInterval", ImapConfiguration.DEFAULT_HEARTBEAT_INTERVAL_IN_SECONDS))
                 .idleTimeIntervalUnit(getTimeIntervalUnit(configuration.getString("idleTimeIntervalUnit", DEFAULT_TIME_UNIT)))
                 .disabledCaps(disabledCaps)
+                .maxQueueSize(configuration.getInteger("maxQueueSize", ImapConfiguration.DEFAULT_QUEUE_SIZE))
+                .concurrentRequests(configuration.getInteger("concurrentRequests", ImapConfiguration.DEFAULT_CONCURRENT_REQUESTS))
                 .build();
     }
 
@@ -271,6 +278,7 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC
     protected ChannelInboundHandlerAdapter createCoreHandler() {
         Encryption secure = getEncryption();
         return ImapChannelUpstreamHandler.builder()
+            .reactiveThrottler(reactiveThrottler)
             .hello(hello)
             .processor(processor)
             .encoder(encoder)
diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServerFactory.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServerFactory.java
index e7046b222c..8ce292b35e 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServerFactory.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServerFactory.java
@@ -29,6 +29,7 @@ import org.apache.james.filesystem.api.FileSystem;
 import org.apache.james.imap.api.process.ImapProcessor;
 import org.apache.james.imap.decode.ImapDecoder;
 import org.apache.james.imap.encode.ImapEncoder;
+import org.apache.james.metrics.api.GaugeRegistry;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.protocols.lib.netty.AbstractConfigurableAsyncServer;
 import org.apache.james.protocols.lib.netty.AbstractServerFactory;
@@ -40,19 +41,21 @@ public class IMAPServerFactory extends AbstractServerFactory {
     protected final ImapEncoder encoder;
     protected final ImapProcessor processor;
     protected final ImapMetrics imapMetrics;
+    protected final GaugeRegistry gaugeRegistry;
 
     @Inject
     public IMAPServerFactory(FileSystem fileSystem, ImapDecoder decoder, ImapEncoder encoder, ImapProcessor processor,
-                             MetricFactory metricFactory) {
+                             MetricFactory metricFactory, GaugeRegistry gaugeRegistry) {
         this.fileSystem = fileSystem;
         this.decoder = decoder;
         this.encoder = encoder;
         this.processor = processor;
         this.imapMetrics = new ImapMetrics(metricFactory);
+        this.gaugeRegistry = gaugeRegistry;
     }
 
     protected IMAPServer createServer() {
-       return new IMAPServer(decoder, encoder, processor, imapMetrics);
+       return new IMAPServer(decoder, encoder, processor, imapMetrics, gaugeRegistry);
     }
     
     @Override
diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
index fafe03bc4f..9736c3ae46 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
@@ -73,6 +73,12 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp
         private ImapMetrics imapMetrics;
         private boolean ignoreIDLEUponProcessing;
         private Duration heartbeatInterval;
+        private ReactiveThrottler reactiveThrottler;
+
+        public ImapChannelUpstreamHandlerBuilder reactiveThrottler(ReactiveThrottler reactiveThrottler) {
+            this.reactiveThrottler = reactiveThrottler;
+            return this;
+        }
 
         public ImapChannelUpstreamHandlerBuilder hello(String hello) {
             this.hello = hello;
@@ -120,7 +126,7 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp
         }
 
         public ImapChannelUpstreamHandler build() {
-            return new ImapChannelUpstreamHandler(hello, processor, encoder, compress, secure, imapMetrics, authenticationConfiguration, ignoreIDLEUponProcessing, (int) heartbeatInterval.toSeconds());
+            return new ImapChannelUpstreamHandler(hello, processor, encoder, compress, secure, imapMetrics, authenticationConfiguration, ignoreIDLEUponProcessing, (int) heartbeatInterval.toSeconds(), reactiveThrottler);
         }
     }
 
@@ -129,28 +135,20 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp
     }
 
     private final String hello;
-
     private final Encryption secure;
-
     private final boolean compress;
-
     private final ImapProcessor processor;
-
     private final ImapEncoder encoder;
-
     private final ImapHeartbeatHandler heartbeatHandler;
-
     private final AuthenticationConfiguration authenticationConfiguration;
-
     private final Metric imapConnectionsMetric;
-
     private final Metric imapCommandsMetric;
-
     private final boolean ignoreIDLEUponProcessing;
+    private final ReactiveThrottler reactiveThrottler;
 
     public ImapChannelUpstreamHandler(String hello, ImapProcessor processor, ImapEncoder encoder, boolean compress,
                                       Encryption secure, ImapMetrics imapMetrics, AuthenticationConfiguration authenticationConfiguration,
-                                      boolean ignoreIDLEUponProcessing, int heartbeatIntervalSeconds) {
+                                      boolean ignoreIDLEUponProcessing, int heartbeatIntervalSeconds, ReactiveThrottler reactiveThrottler) {
         this.hello = hello;
         this.processor = processor;
         this.encoder = encoder;
@@ -161,6 +159,7 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp
         this.imapCommandsMetric = imapMetrics.getCommandsMetric();
         this.ignoreIDLEUponProcessing = ignoreIDLEUponProcessing;
         this.heartbeatHandler = new ImapHeartbeatHandler(heartbeatIntervalSeconds, heartbeatIntervalSeconds, heartbeatIntervalSeconds);
+        this.reactiveThrottler = reactiveThrottler;
     }
 
     @Override
@@ -275,7 +274,7 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp
     }
 
     @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+    public void channelRead(ChannelHandlerContext ctx, Object msg) {
         imapCommandsMetric.increment();
         ImapSession session = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get();
         ImapResponseComposer response = new ImapResponseComposerImpl(new ChannelImapResponseWriter(ctx.channel()));
@@ -283,40 +282,41 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp
 
         beforeIDLEUponProcessing(ctx);
         ResponseEncoder responseEncoder = new ResponseEncoder(encoder, response);
-        processor.processReactive(message, responseEncoder, session)
-            .doOnEach(Throwing.consumer(signal -> {
-                if (session.getState() == ImapSessionState.LOGOUT) {
-                    // Make sure we close the channel after all the buffers were flushed out
-                    Channel channel = ctx.channel();
-                    if (channel.isActive()) {
-                        channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+        reactiveThrottler.throttle(
+            processor.processReactive(message, responseEncoder, session)
+                .doOnEach(Throwing.consumer(signal -> {
+                    if (session.getState() == ImapSessionState.LOGOUT) {
+                        // Make sure we close the channel after all the buffers were flushed out
+                        Channel channel = ctx.channel();
+                        if (channel.isActive()) {
+                            channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+                        }
                     }
-                }
-                if (signal.isOnComplete()) {
-                    IOException failure = responseEncoder.getFailure();
-                    if (failure != null) {
-                        try (Closeable mdc = ReactorUtils.retrieveMDCBuilder(signal).build()) {
-                            LOGGER.info(failure.getMessage());
-                            LOGGER.debug("Failed to write {}", message, failure);
-                        } catch (IOException e) {
-                            throw new RuntimeException(e);
+                    if (signal.isOnComplete()) {
+                        IOException failure = responseEncoder.getFailure();
+                        if (failure != null) {
+                            try (Closeable mdc = ReactorUtils.retrieveMDCBuilder(signal).build()) {
+                                LOGGER.info(failure.getMessage());
+                                LOGGER.debug("Failed to write {}", message, failure);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+
+                            ctx.fireExceptionCaught(failure);
                         }
-
-                        ctx.fireExceptionCaught(failure);
                     }
-                }
-                if (signal.isOnComplete() || signal.isOnError()) {
-                    afterIDLEUponProcessing(ctx);
-                    if (message instanceof Closeable) {
-                        ((Closeable) message).close();
+                    if (signal.isOnComplete() || signal.isOnError()) {
+                        afterIDLEUponProcessing(ctx);
+                        if (message instanceof Closeable) {
+                            ((Closeable) message).close();
+                        }
                     }
-                }
-                if (signal.hasError()) {
-                    ctx.fireExceptionCaught(signal.getThrowable());
-                }
-                ctx.fireChannelReadComplete();
-            }))
-            .contextWrite(ReactorUtils.context("imap", mdc(ctx)))
+                    if (signal.hasError()) {
+                        ctx.fireExceptionCaught(signal.getThrowable());
+                    }
+                    ctx.fireChannelReadComplete();
+                }))
+                .contextWrite(ReactorUtils.context("imap", mdc(ctx))))
             .subscribe();
     }
 
diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
index 5e2b607c01..67df59b84f 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
@@ -49,7 +49,7 @@ public class ReactiveThrottler {
         this.maxQueueSize = maxQueueSize;
     }
 
-    public Publisher<Void> throttle(Publisher<Void> task) {
+    public Mono<Void> throttle(Publisher<Void> task) {
         int requestNumber = concurrentRequests.incrementAndGet();
 
         if (requestNumber <= maxConcurrentRequests) {
diff --git a/server/protocols/protocols-imap4/src/main/resources/META-INF/spring/imapserver-context.xml b/server/protocols/protocols-imap4/src/main/resources/META-INF/spring/imapserver-context.xml
index 5d0ee124fa..82cc971e89 100644
--- a/server/protocols/protocols-imap4/src/main/resources/META-INF/spring/imapserver-context.xml
+++ b/server/protocols/protocols-imap4/src/main/resources/META-INF/spring/imapserver-context.xml
@@ -25,6 +25,7 @@
         <constructor-arg index="2" ref="imapEncoder" />
         <constructor-arg index="3" ref="imapProcessor" />
         <constructor-arg index="4" ref="metricFactory" />
+        <constructor-arg index="5" ref="gaugeRegistry" />
     </bean>
 
     <!-- The imapProcessor configuration will be reviewed when IMAP will be integrated into Protocols project -->
diff --git a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerConfigurationTest.java b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerConfigurationTest.java
index e98889f1ce..d8060a3fda 100644
--- a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerConfigurationTest.java
+++ b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerConfigurationTest.java
@@ -40,6 +40,8 @@ class IMAPServerConfigurationTest {
                 .enableIdle(ImapConfiguration.DEFAULT_ENABLE_IDLE)
                 .idleTimeInterval(ImapConfiguration.DEFAULT_HEARTBEAT_INTERVAL_IN_SECONDS)
                 .idleTimeIntervalUnit(ImapConfiguration.DEFAULT_HEARTBEAT_INTERVAL_UNIT)
+                .maxQueueSize(ImapConfiguration.DEFAULT_QUEUE_SIZE)
+                .concurrentRequests(ImapConfiguration.DEFAULT_CONCURRENT_REQUESTS)
                 .disabledCaps(ImmutableSet.<String>of())
                 .build();
 
@@ -51,6 +53,8 @@ class IMAPServerConfigurationTest {
         HierarchicalConfiguration<ImmutableNode> configurationBuilder = new BaseHierarchicalConfiguration();
         configurationBuilder.addProperty("enableIdle", "false");
         configurationBuilder.addProperty("idleTimeInterval", "1");
+        configurationBuilder.addProperty("maxQueueSize", "12");
+        configurationBuilder.addProperty("concurrentRequests", "42");
         configurationBuilder.addProperty("idleTimeIntervalUnit", "MINUTES");
         configurationBuilder.addProperty("disabledCaps", "ACL | MOVE");
         ImapConfiguration imapConfiguration = IMAPServer.getImapConfiguration(configurationBuilder);
@@ -60,6 +64,8 @@ class IMAPServerConfigurationTest {
                 .idleTimeInterval(1)
                 .idleTimeIntervalUnit(TimeUnit.MINUTES)
                 .disabledCaps(ImmutableSet.of("ACL", "MOVE"))
+                .maxQueueSize(12)
+                .concurrentRequests(42)
                 .build();
 
         assertThat(imapConfiguration).isEqualTo(expectImapConfiguration);
diff --git a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java
index 7671233d16..f504e1be8d 100644
--- a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java
+++ b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java
@@ -85,6 +85,7 @@ import org.apache.james.mailbox.model.UidValidity;
 import org.apache.james.mailbox.store.FakeAuthenticator;
 import org.apache.james.mailbox.store.FakeAuthorizator;
 import org.apache.james.mailbox.store.StoreSubscriptionManager;
+import org.apache.james.metrics.api.NoopGaugeRegistry;
 import org.apache.james.metrics.tests.RecordingMetricFactory;
 import org.apache.james.protocols.api.OIDCSASLHelper;
 import org.apache.james.protocols.api.utils.BogusSslContextFactory;
@@ -157,7 +158,8 @@ class IMAPServerTest {
                 memoryIntegrationResources.getQuotaManager(),
                 memoryIntegrationResources.getQuotaRootResolver(),
                 metricFactory),
-            new ImapMetrics(metricFactory));
+            new ImapMetrics(metricFactory),
+            new NoopGaugeRegistry());
 
         Configuration configuration = Configuration.builder()
             .workingDirectory("../")


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 03/07: JAMES-3816 Allow turning off ReactiveThrottler

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 72c5ebbb3be1cc5e7915102b2727c9d9a238089c
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Sep 9 15:37:53 2022 +0700

    JAMES-3816 Allow turning off ReactiveThrottler
---
 .../src/main/java/org/apache/james/imap/api/ImapConfiguration.java     | 1 -
 .../main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java | 3 +++
 2 files changed, 3 insertions(+), 1 deletion(-)

diff --git a/protocols/imap/src/main/java/org/apache/james/imap/api/ImapConfiguration.java b/protocols/imap/src/main/java/org/apache/james/imap/api/ImapConfiguration.java
index 21f8928774..5fbbfb4893 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/api/ImapConfiguration.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/api/ImapConfiguration.java
@@ -75,7 +75,6 @@ public class ImapConfiguration {
         }
 
         public Builder concurrentRequests(int concurrentRequests) {
-            Preconditions.checkArgument(concurrentRequests > 0, "concurrentRequests should not be zero or negative");
             this.concurrentRequests = Optional.of(concurrentRequests);
             return this;
         }
diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
index 67df59b84f..0d9028d556 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
@@ -50,6 +50,9 @@ public class ReactiveThrottler {
     }
 
     public Mono<Void> throttle(Publisher<Void> task) {
+        if (maxConcurrentRequests < 0) {
+            return Mono.from(task);
+        }
         int requestNumber = concurrentRequests.incrementAndGet();
 
         if (requestNumber <= maxConcurrentRequests) {


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 06/07: JAMES-3816 Start nested IMAP requests without switching threads

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5b6b9accf45be1efd47a50f7e54a85c2ed9b37bd
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Sep 16 21:50:30 2022 +0700

    JAMES-3816 Start nested IMAP requests without switching threads
    
    Not needed
---
 .../main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java  | 2 --
 1 file changed, 2 deletions(-)

diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
index 2a53e4a58b..22d4b5acc5 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
@@ -28,7 +28,6 @@ import org.reactivestreams.Publisher;
 
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Sinks;
-import reactor.core.scheduler.Schedulers;
 
 public class ReactiveThrottler {
     public static class RejectedException extends RuntimeException {
@@ -84,7 +83,6 @@ public class ReactiveThrottler {
         if (throttled != null) {
             Mono.from(throttled)
                 .doFinally(any -> onRequestDone())
-                .subscribeOn(Schedulers.parallel())
                 .subscribe();
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org