You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2022/09/19 02:58:26 UTC
[james-project] 01/07: JAMES-3816 throttler for reactive workloads
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 7897213d55a8fcfa098e83e3386b1744afd52fb6
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Sep 5 14:27:41 2022 +0700
JAMES-3816 throttler for reactive workloads
---
.../james/imapserver/netty/ReactiveThrottler.java | 86 ++++++++++++++
.../imapserver/netty/ReactiveThrottlerTest.java | 124 +++++++++++++++++++++
2 files changed, 210 insertions(+)
diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
new file mode 100644
index 0000000000..5e2b607c01
--- /dev/null
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
@@ -0,0 +1,86 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.imapserver.netty;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.james.metrics.api.GaugeRegistry;
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
+
+public class ReactiveThrottler {
+ public static class RejectedException extends RuntimeException {
+ public RejectedException(String message) {
+ super(message);
+ }
+ }
+
+ private final int maxConcurrentRequests;
+ private final int maxQueueSize;
+ // In flight + executing
+ private final AtomicInteger concurrentRequests = new AtomicInteger(0);
+ private final Queue<Publisher<Void>> queue = new ConcurrentLinkedQueue<>();
+
+ public ReactiveThrottler(GaugeRegistry gaugeRegistry, int maxConcurrentRequests, int maxQueueSize) {
+ gaugeRegistry.register("imap.request.queue.size", () -> Math.max(concurrentRequests.get() - maxConcurrentRequests, 0));
+
+ this.maxConcurrentRequests = maxConcurrentRequests;
+ this.maxQueueSize = maxQueueSize;
+ }
+
+ public Publisher<Void> throttle(Publisher<Void> task) {
+ int requestNumber = concurrentRequests.incrementAndGet();
+
+ if (requestNumber <= maxConcurrentRequests) {
+ // We have capacity for one more concurrent request
+ return Mono.from(task)
+ .then(Mono.defer(this::onRequestDone));
+ } else if (requestNumber <= maxQueueSize + maxConcurrentRequests) {
+ // Queue the request for later
+ Sinks.One<Void> one = Sinks.one();
+ queue.add(Mono.from(task)
+ .then(Mono.fromRunnable(() -> one.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST))));
+ // Let the caller await task completion
+ return one.asMono();
+ } else {
+ concurrentRequests.decrementAndGet();
+
+ return Mono.error(new RejectedException(
+ String.format(
+ "The IMAP server has reached its maximum capacity "
+ + "(concurrent requests: %d, queue size: %d)",
+ maxConcurrentRequests, maxQueueSize)));
+ }
+ }
+
+ private Mono<Void> onRequestDone() {
+ concurrentRequests.decrementAndGet();
+ Publisher<Void> throttled = queue.poll();
+ if (throttled != null) {
+ return Mono.from(throttled)
+ .then(Mono.defer(this::onRequestDone));
+ }
+ return Mono.empty();
+ }
+}
diff --git a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
new file mode 100644
index 0000000000..e8d626c2dc
--- /dev/null
+++ b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
@@ -0,0 +1,124 @@
+package org.apache.james.imapserver.netty;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.time.Duration;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.james.metrics.api.NoopGaugeRegistry;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+class ReactiveThrottlerTest {
+ @Test
+ void throttleShouldExecuteSubmittedTasks() {
+ // Given a throttler
+ ReactiveThrottler testee = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
+
+ // When I submit a task
+ AtomicBoolean executed = new AtomicBoolean(false);
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(Mono.fromRunnable(() -> executed.getAndSet(true))))).block();
+
+ // Then that task is executed
+ assertThat(executed.get()).isTrue();
+ }
+
+ @Test
+ void throttleShouldNotExecuteQueuedTasksLogicRightAway() {
+ // Given a throttler
+ ReactiveThrottler testee = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
+
+ // When I submit many tasks task
+ AtomicBoolean executed = new AtomicBoolean(false);
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(200)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(200)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+ Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).subscribeOn(Schedulers.elastic()).subscribe();
+
+ // Then that task is not executed straight away
+ assertThat(executed.get()).isFalse();
+ }
+
+ @Test
+ void throttleShouldEventuallyExecuteQueuedTasks() {
+ // Given a throttler
+ ReactiveThrottler testee = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
+
+ // When I submit many tasks task
+ AtomicBoolean executed = new AtomicBoolean(false);
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+ Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).subscribeOn(Schedulers.elastic()).subscribe();
+
+ // Then that task is eventually executed
+ Awaitility.await().atMost(Duration.ofSeconds(10))
+ .untilAsserted(() -> assertThat(executed.get()).isTrue());
+ }
+
+ @Test
+ void throttleShouldCompleteWhenSubmittedTaskCompletes() {
+ // Given a throttler
+ ReactiveThrottler testee = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
+
+ // When I await a submitted task execution and it is queued
+ AtomicBoolean executed = new AtomicBoolean(false);
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+ Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).block();
+
+ // Then when done that task have been executed
+ assertThat(executed.get()).isTrue();
+ }
+
+ @Test
+ void throttleShouldRejectTasksWhenTheQueueIsFull() {
+ // Given a throttler
+ ReactiveThrottler testee = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
+
+ // When I submit too many tasks task
+ AtomicBoolean executed = new AtomicBoolean(false);
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then())).subscribeOn(Schedulers.elastic()).subscribe();
+
+ // Then extra tasks are rejected
+ assertThatThrownBy(() -> Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)))).block())
+ .isInstanceOf(ReactiveThrottler.RejectedException.class);
+ // And the task is not executed
+ assertThat(executed.get()).isFalse();
+ }
+
+ @Test
+ void throttleShouldNotExceedItsConcurrency() {
+ // Given a throttler
+ ReactiveThrottler testee = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
+
+ // When I submit many tasks task
+ AtomicInteger concurrentTasks = new AtomicInteger(0);
+ ConcurrentLinkedDeque<Integer> concurrentTasksCountSnapshots = new ConcurrentLinkedDeque<>();
+
+ Mono<Void> operation = Mono.fromRunnable(() -> {
+ int i = concurrentTasks.incrementAndGet();
+ concurrentTasksCountSnapshots.add(i);
+ }).then(Mono.delay(Duration.ofMillis(50)))
+ .then(Mono.fromRunnable(() -> {
+ int i = concurrentTasks.getAndDecrement();
+ concurrentTasksCountSnapshots.add(i);
+ }));
+ Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.elastic()).subscribe();
+ Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.elastic()).subscribe();
+ Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.elastic()).subscribe();
+ Mono.from(testee.throttle(operation)).subscribeOn(Schedulers.elastic()).subscribe();
+
+ // Then maximum parallelism is not exceeded
+ Awaitility.await().untilAsserted(() -> assertThat(concurrentTasksCountSnapshots.size()).isEqualTo(8));
+ assertThat(concurrentTasksCountSnapshots)
+ .allSatisfy(i -> assertThat(i).isBetween(0, 2));
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org