You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2022/09/19 02:58:26 UTC

[james-project] 01/07: JAMES-3816 throttler for reactive workloads

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 7897213d55a8fcfa098e83e3386b1744afd52fb6
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Sep 5 14:27:41 2022 +0700

    JAMES-3816 throttler for reactive workloads
---
 .../james/imapserver/netty/ReactiveThrottler.java  |  86 ++++++++++++++
 .../imapserver/netty/ReactiveThrottlerTest.java    | 124 +++++++++++++++++++++
 2 files changed, 210 insertions(+)

diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
new file mode 100644
index 0000000000..5e2b607c01
--- /dev/null
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
@@ -0,0 +1,86 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.imapserver.netty;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.james.metrics.api.GaugeRegistry;
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
+
+public class ReactiveThrottler {
+    public static class RejectedException extends RuntimeException {
+        public RejectedException(String message) {
+            super(message);
+        }
+    }
+
+    private final int maxConcurrentRequests;
+    private final int maxQueueSize;
+    // In flight + executing
+    private final AtomicInteger concurrentRequests = new AtomicInteger(0);
+    private final Queue<Publisher<Void>> queue = new ConcurrentLinkedQueue<>();
+
+    public ReactiveThrottler(GaugeRegistry gaugeRegistry, int maxConcurrentRequests, int maxQueueSize) {
+        gaugeRegistry.register("imap.request.queue.size", () -> Math.max(concurrentRequests.get() - maxConcurrentRequests, 0));
+
+        this.maxConcurrentRequests = maxConcurrentRequests;
+        this.maxQueueSize = maxQueueSize;
+    }
+
+    public Publisher<Void> throttle(Publisher<Void> task) {
+        int requestNumber = concurrentRequests.incrementAndGet();
+
+        if (requestNumber <= maxConcurrentRequests) {
+            // We have capacity for one more concurrent request
+            return Mono.from(task)
+                .then(Mono.defer(this::onRequestDone));
+        } else if (requestNumber <= maxQueueSize + maxConcurrentRequests) {
+            // Queue the request for later
+            Sinks.One<Void> one = Sinks.one();
+            queue.add(Mono.from(task)
+                .then(Mono.fromRunnable(() -> one.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST))));
+            // Let the caller await task completion
+            return one.asMono();
+        } else {
+            concurrentRequests.decrementAndGet();
+
+            return Mono.error(new RejectedException(
+                String.format(
+                    "The IMAP server has reached its maximum capacity "
+                        + "(concurrent requests: %d, queue size: %d)",
+                    maxConcurrentRequests, maxQueueSize)));
+        }
+    }
+
+    private Mono<Void> onRequestDone() {
+        concurrentRequests.decrementAndGet();
+        Publisher<Void> throttled = queue.poll();
+        if (throttled != null) {
+            return Mono.from(throttled)
+                .then(Mono.defer(this::onRequestDone));
+        }
+        return Mono.empty();
+    }
+}
diff --git a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
new file mode 100644
index 0000000000..e8d626c2dc
--- /dev/null
+++ b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
@@ -0,0 +1,124 @@
+package org.apache.james.imapserver.netty;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.time.Duration;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.james.metrics.api.NoopGaugeRegistry;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+class ReactiveThrottlerTest {
+    @Test
+    void throttleShouldExecuteSubmittedTasks() {
+        // Given a throttler
+        ReactiveThrottler testee = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
+
+        // When I submit a task
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(Mono.fromRunnable(() -> executed.getAndSet(true))))).block();
+
+        // Then that task is executed
+        assertThat(executed.get()).isTrue();
+    }
+
+    @Test
+    void throttleShouldNotExecuteQueuedTasksLogicRightAway() {
+        // Given a throttler
+        ReactiveThrottler testee = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
+
+        // When I submit many tasks task
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(200)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(200)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).subscribeOn(Schedulers.elastic()).subscribe();
+
+        // Then that task is not executed straight away
+        assertThat(executed.get()).isFalse();
+    }
+
+    @Test
+    void throttleShouldEventuallyExecuteQueuedTasks() {
+        // Given a throttler
+        ReactiveThrottler testee = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
+
+        // When I submit many tasks task
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).subscribeOn(Schedulers.elastic()).subscribe();
+
+        // Then that task is eventually executed
+        Awaitility.await().atMost(Duration.ofSeconds(10))
+            .untilAsserted(() -> assertThat(executed.get()).isTrue());
+    }
+
+    @Test
+    void throttleShouldCompleteWhenSubmittedTaskCompletes() {
+        // Given a throttler
+        ReactiveThrottler testee = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
+
+        // When I await a submitted task execution and it is queued
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).block();
+
+        // Then when done that task have been executed
+        assertThat(executed.get()).isTrue();
+    }
+
+    @Test
+    void throttleShouldRejectTasksWhenTheQueueIsFull() {
+        // Given a throttler
+        ReactiveThrottler testee = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
+
+        // When I submit too many tasks task
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+
+        // Then extra tasks are rejected
+        assertThatThrownBy(() -> Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).block())
+            .isInstanceOf(ReactiveThrottler.RejectedException.class);
+        // And the task is not executed
+        assertThat(executed.get()).isFalse();
+    }
+
+    @Test
+    void throttleShouldNotExceedItsConcurrency() {
+        // Given a throttler
+        ReactiveThrottler testee = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
+
+        // When I submit many tasks task
+        AtomicInteger concurrentTasks = new AtomicInteger(0);
+        ConcurrentLinkedDeque<Integer> concurrentTasksCountSnapshots = new ConcurrentLinkedDeque<>();
+
+        Mono<Void> operation = Mono.fromRunnable(() -> {
+            int i = concurrentTasks.incrementAndGet();
+            concurrentTasksCountSnapshots.add(i);
+        }).then(Mono.delay(Duration.ofMillis(50)))
+            .then(Mono.fromRunnable(() -> {
+                int i = concurrentTasks.getAndDecrement();
+                concurrentTasksCountSnapshots.add(i);
+            }));
+        Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.elastic()).subscribe();
+        Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.elastic()).subscribe();
+
+        // Then maximum parallelism is not exceeded
+        Awaitility.await().untilAsserted(() -> assertThat(concurrentTasksCountSnapshots.size()).isEqualTo(8));
+        assertThat(concurrentTasksCountSnapshots)
+            .allSatisfy(i -> assertThat(i).isBetween(0, 2));
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org