You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/06/25 02:56:07 UTC
[james-project] 01/10: JAMES-3184 Transform throttle into a
reusable transformation
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit bd9147abd06475480f8fad41d3aba6d4d8169716
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Fri May 29 11:30:28 2020 +0200
JAMES-3184 Transform throttle into a reusable transformation
---
.../task/SolveMessageInconsistenciesService.java | 24 +++++----
.../mailbox/tools/indexer/ReIndexerPerformer.java | 8 +--
.../quota/task/RecomputeCurrentQuotasService.java | 10 ++--
.../java/org/apache/james/util/ReactorUtils.java | 55 +++++++++----------
.../org/apache/james/util/ReactorUtilsTest.java | 62 +++++++++++++++-------
.../jmap/MessageFastViewProjectionCorrector.java | 7 +--
6 files changed, 97 insertions(+), 69 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
index 7d492fa..5279d56 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
@@ -423,11 +423,13 @@ public class SolveMessageInconsistenciesService {
}
private Flux<Task.Result> fixInconsistenciesInImapUid(Context context, RunningOptions runningOptions) {
- return ReactorUtils.Throttler.forOperation(this::detectInconsistencyInImapUid)
- .window(runningOptions.getMessagesPerSecond(), PERIOD)
- .throttle(messageIdToImapUidDAO.retrieveAllMessages())
- .doOnNext(any -> context.incrementProcessedImapUidEntries())
- .flatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO));
+ return messageIdToImapUidDAO.retrieveAllMessages()
+ .transform(ReactorUtils.<ComposedMessageIdWithMetaData, Task.Result>throttle()
+ .elements(runningOptions.getMessagesPerSecond())
+ .per(PERIOD)
+ .forOperation(metaData -> detectInconsistencyInImapUid(metaData)
+ .doOnNext(any -> context.incrementProcessedImapUidEntries())
+ .flatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO))));
}
private Mono<Inconsistency> detectInconsistencyInImapUid(ComposedMessageIdWithMetaData message) {
@@ -468,11 +470,13 @@ public class SolveMessageInconsistenciesService {
}
private Flux<Task.Result> fixInconsistenciesInMessageId(Context context, RunningOptions runningOptions) {
- return ReactorUtils.Throttler.forOperation(this::detectInconsistencyInMessageId)
- .window(runningOptions.getMessagesPerSecond(), PERIOD)
- .throttle(messageIdDAO.retrieveAllMessages())
- .doOnNext(any -> context.incrementMessageIdEntries())
- .flatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO));
+ return messageIdDAO.retrieveAllMessages()
+ .transform(ReactorUtils.<ComposedMessageIdWithMetaData, Task.Result>throttle()
+ .elements(runningOptions.getMessagesPerSecond())
+ .per(PERIOD)
+ .forOperation(metadata -> detectInconsistencyInMessageId(metadata)
+ .doOnNext(any -> context.incrementMessageIdEntries())
+ .flatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO))));
}
private Mono<Inconsistency> detectInconsistencyInMessageId(ComposedMessageIdWithMetaData message) {
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
index 124238c..34d1a95 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
@@ -279,10 +279,10 @@ public class ReIndexerPerformer {
}
private Mono<Task.Result> reIndexMessages(Flux<Either<Failure, ReIndexingEntry>> entriesToIndex, RunningOptions runningOptions, ReprocessingContext reprocessingContext) {
- return ReactorUtils.Throttler.<Either<Failure, ReIndexingEntry>, Task.Result>forOperation(
- entry -> reIndex(entry, reprocessingContext))
- .window(runningOptions.getMessagesPerSecond(), Duration.ofSeconds(1))
- .throttle(entriesToIndex)
+ return entriesToIndex.transform(ReactorUtils.<Either<Failure, ReIndexingEntry>, Task.Result>throttle()
+ .elements(runningOptions.getMessagesPerSecond())
+ .per(Duration.ofSeconds(1))
+ .forOperation(entry -> reIndex(entry, reprocessingContext)))
.reduce(Task::combine)
.switchIfEmpty(Mono.just(Result.COMPLETED));
}
diff --git a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
index 57d7b72..ef49c23 100644
--- a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
+++ b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
@@ -47,7 +47,6 @@ import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class RecomputeCurrentQuotasService {
@@ -164,10 +163,11 @@ public class RecomputeCurrentQuotasService {
public Mono<Task.Result> recomputeCurrentQuotas(Context context, RunningOptions runningOptions) {
try {
- Flux<Username> users = Iterators.toFlux(usersRepository.list());
- return ReactorUtils.Throttler.<Username, Task.Result>forOperation(username -> recomputeUserCurrentQuotas(context, username))
- .window(runningOptions.getUsersPerSecond(), Duration.ofSeconds(1))
- .throttle(users)
+ return Iterators.toFlux(usersRepository.list())
+ .transform(ReactorUtils.<Username, Task.Result>throttle()
+ .elements(runningOptions.getUsersPerSecond())
+ .per(Duration.ofSeconds(1))
+ .forOperation(username -> recomputeUserCurrentQuotas(context, username)))
.reduce(Task.Result.COMPLETED, Task::combine);
} catch (UsersRepositoryException e) {
LOGGER.error("Error while accessing users from repository", e);
diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index a7bed0e..1548d1d 100644
--- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -44,38 +44,35 @@ public class ReactorUtils {
public static final String MDC_KEY_PREFIX = "MDC-";
- public static class Throttler<T, U> {
- private static final Duration DELAY = Duration.ZERO;
-
- public static <T, U> RequiresWindowingParameters<T, U> forOperation(Function<T, Publisher<U>> operation) {
- return (maxSize, duration) -> new Throttler<>(operation, maxSize, duration);
- }
-
- @FunctionalInterface
- public interface RequiresWindowingParameters<T, U> {
- Throttler<T, U> window(int maxSize, Duration duration);
- }
-
- private Throttler(Function<T, Publisher<U>> operation, int windowMaxSize, Duration windowDuration) {
- Preconditions.checkArgument(windowMaxSize > 0, "'windowMaxSize' must be strictly positive");
- Preconditions.checkArgument(!windowDuration.isNegative(), "'windowDuration' must be strictly positive");
- Preconditions.checkArgument(!windowDuration.isZero(), "'windowDuration' must be strictly positive");
+ private static final Duration DELAY = Duration.ZERO;
+
+ public static <T, U> RequiresQuantity<T, U> throttle() {
+ return elements -> duration -> operation -> {
+ Preconditions.checkArgument(elements > 0, "'windowMaxSize' must be strictly positive");
+ Preconditions.checkArgument(!duration.isNegative(), "'windowDuration' must be strictly positive");
+ Preconditions.checkArgument(!duration.isZero(), "'windowDuration' must be strictly positive");
+
+ return flux -> flux
+ .windowTimeout(elements, duration)
+ .zipWith(Flux.interval(DELAY, duration))
+ .flatMap(Tuple2::getT1, elements, elements)
+ .flatMap(operation, elements);
+ };
+ }
- this.operation = operation;
- this.windowMaxSize = windowMaxSize;
- this.windowDuration = windowDuration;
- }
+ @FunctionalInterface
+ public interface RequiresQuantity<T, U> {
+ RequiresPeriod<T, U> elements(int maxSize);
+ }
- private final Function<T, Publisher<U>> operation;
- private final int windowMaxSize;
- private final Duration windowDuration;
+ @FunctionalInterface
+ public interface RequiresPeriod<T, U> {
+ RequiresOperation<T, U> per(Duration duration);
+ }
- public Flux<U> throttle(Flux<T> flux) {
- return flux.windowTimeout(windowMaxSize, windowDuration)
- .zipWith(Flux.interval(DELAY, windowDuration))
- .flatMap(Tuple2::getT1)
- .flatMap(operation, windowMaxSize);
- }
+ @FunctionalInterface
+ public interface RequiresOperation<T, U> {
+ Function<Flux<T>, Flux<U>> forOperation(Function<T, Publisher<U>> operation);
}
public static <T> Mono<T> executeAndEmpty(Runnable runnable) {
diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
index 984c165..dc286ad 100644
--- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -19,6 +19,7 @@
package org.apache.james.util;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.io.ByteArrayInputStream;
@@ -34,7 +35,6 @@ import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.james.util.ReactorUtils.Throttler;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
@@ -56,29 +56,37 @@ class ReactorUtilsTest {
class Throttling {
@Test
void windowShouldThrowWhenMaxSizeIsNegative() {
- assertThatThrownBy(() -> Throttler.forOperation(Mono::just)
- .window(-1, Duration.ofSeconds(1)))
+ assertThatThrownBy(() -> ReactorUtils.<Integer, Integer>throttle()
+ .elements(-1)
+ .per(Duration.ofSeconds(1))
+ .forOperation(Mono::just))
.isInstanceOf(IllegalArgumentException.class);
}
@Test
void windowShouldThrowWhenMaxSizeIsZero() {
- assertThatThrownBy(() -> Throttler.forOperation(Mono::just)
- .window(0, Duration.ofSeconds(1)))
+ assertThatThrownBy(() -> ReactorUtils.throttle()
+ .elements(0)
+ .per(Duration.ofSeconds(1))
+ .forOperation(Mono::just))
.isInstanceOf(IllegalArgumentException.class);
}
@Test
void windowShouldThrowWhenDurationIsNegative() {
- assertThatThrownBy(() -> Throttler.forOperation(Mono::just)
- .window(1, Duration.ofSeconds(-1)))
+ assertThatThrownBy(() -> ReactorUtils.throttle()
+ .elements(1)
+ .per(Duration.ofSeconds(-1))
+ .forOperation(Mono::just))
.isInstanceOf(IllegalArgumentException.class);
}
@Test
void windowShouldThrowWhenDurationIsZero() {
- assertThatThrownBy(() -> Throttler.forOperation(Mono::just)
- .window(1, Duration.ZERO))
+ assertThatThrownBy(() -> ReactorUtils.throttle()
+ .elements(1)
+ .per(Duration.ofSeconds(0))
+ .forOperation(Mono::just))
.isInstanceOf(IllegalArgumentException.class);
}
@@ -89,13 +97,13 @@ class ReactorUtilsTest {
Stopwatch stopwatch = Stopwatch.createUnstarted();
- Flux<Integer> originalFlux = Flux.range(0, 10);
- ImmutableList<Long> windowMembership = Throttler.<Integer, Long>forOperation(
- i -> Mono.fromCallable(() -> stopwatch.elapsed(TimeUnit.MILLISECONDS)))
- .window(windowMaxSize, windowDuration)
- .throttle(originalFlux)
- .doOnSubscribe(signal -> stopwatch.start())
+ ImmutableList<Long> windowMembership = Flux.range(0, 10)
+ .transform(ReactorUtils.<Integer, Long>throttle()
+ .elements(windowMaxSize)
+ .per(windowDuration)
+ .forOperation(i -> Mono.fromCallable(() -> stopwatch.elapsed(TimeUnit.MILLISECONDS))))
.map(i -> i / 100)
+ .doOnSubscribe(signal -> stopwatch.start())
.collect(Guavate.toImmutableList())
.block();
@@ -104,6 +112,22 @@ class ReactorUtilsTest {
}
@Test
+ void largeWindowShouldNotOverrunIntermediateBuffers() {
+ // windowMaxSize exceeds Queues.SMALL_BUFFER_SIZE & Queues.SMALL_BUFFER_SIZE (256 by default)
+ // Combined with slow operations, this ensures we are not filling up intermediate buffers.
+ int windowMaxSize = 3_000;
+ Duration windowDuration = Duration.ofMillis(100);
+
+ assertThatCode(() -> Flux.range(0, 10_000)
+ .transform(ReactorUtils.<Integer, Long>throttle()
+ .elements(windowMaxSize)
+ .per(windowDuration)
+ .forOperation(i -> Mono.delay(windowDuration.multipliedBy(2))))
+ .blockLast())
+ .doesNotThrowAnyException();
+ }
+
+ @Test
void throttleDownStreamConcurrencyShouldNotExceedWindowMaxSize() {
int windowMaxSize = 3;
Duration windowDuration = Duration.ofMillis(100);
@@ -116,9 +140,11 @@ class ReactorUtilsTest {
.flatMap(i -> Mono.delay(windowDuration.multipliedBy(2)).thenReturn(i))
.flatMap(i -> Mono.fromRunnable(ongoingProcessing::decrementAndGet).thenReturn(i));
- ImmutableList<Integer> ongoingProcessingUponComputationStart = Throttler.forOperation(longRunningOperation)
- .window(windowMaxSize, windowDuration)
- .throttle(originalFlux)
+ ImmutableList<Integer> ongoingProcessingUponComputationStart = originalFlux
+ .transform(ReactorUtils.<Integer, Integer>throttle()
+ .elements(windowMaxSize)
+ .per(windowDuration)
+ .forOperation(longRunningOperation))
.collect(Guavate.toImmutableList())
.block();
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
index f23e13c..b54aaaf 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
@@ -215,9 +215,10 @@ public class MessageFastViewProjectionCorrector {
}
private Mono<Result> correctProjection(Flux<ProjectionEntry> entries, RunningOptions runningOptions, Progress progress) {
- return ReactorUtils.Throttler.<ProjectionEntry, Result>forOperation(entry -> correctProjection(entry, progress))
- .window(runningOptions.getMessagesPerSecond(), PERIOD)
- .throttle(entries)
+ return entries.transform(ReactorUtils.<ProjectionEntry, Task.Result>throttle()
+ .elements(runningOptions.getMessagesPerSecond())
+ .per(PERIOD)
+ .forOperation(entry -> correctProjection(entry, progress)))
.reduce(Task::combine)
.switchIfEmpty(Mono.just(Result.COMPLETED));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org