You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/06/25 02:56:07 UTC

[james-project] 01/10: JAMES-3184 Transform throttle into a reusable transformation

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit bd9147abd06475480f8fad41d3aba6d4d8169716
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Fri May 29 11:30:28 2020 +0200

    JAMES-3184 Transform throttle into a reusable transformation
---
 .../task/SolveMessageInconsistenciesService.java   | 24 +++++----
 .../mailbox/tools/indexer/ReIndexerPerformer.java  |  8 +--
 .../quota/task/RecomputeCurrentQuotasService.java  | 10 ++--
 .../java/org/apache/james/util/ReactorUtils.java   | 55 +++++++++----------
 .../org/apache/james/util/ReactorUtilsTest.java    | 62 +++++++++++++++-------
 .../jmap/MessageFastViewProjectionCorrector.java   |  7 +--
 6 files changed, 97 insertions(+), 69 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
index 7d492fa..5279d56 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
@@ -423,11 +423,13 @@ public class SolveMessageInconsistenciesService {
     }
 
     private Flux<Task.Result> fixInconsistenciesInImapUid(Context context, RunningOptions runningOptions) {
-        return ReactorUtils.Throttler.forOperation(this::detectInconsistencyInImapUid)
-            .window(runningOptions.getMessagesPerSecond(), PERIOD)
-            .throttle(messageIdToImapUidDAO.retrieveAllMessages())
-            .doOnNext(any -> context.incrementProcessedImapUidEntries())
-            .flatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO));
+        return messageIdToImapUidDAO.retrieveAllMessages()
+            .transform(ReactorUtils.<ComposedMessageIdWithMetaData, Task.Result>throttle()
+                .elements(runningOptions.getMessagesPerSecond())
+                .per(PERIOD)
+                .forOperation(metaData -> detectInconsistencyInImapUid(metaData)
+                    .doOnNext(any -> context.incrementProcessedImapUidEntries())
+                    .flatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO))));
     }
 
     private Mono<Inconsistency> detectInconsistencyInImapUid(ComposedMessageIdWithMetaData message) {
@@ -468,11 +470,13 @@ public class SolveMessageInconsistenciesService {
     }
 
     private Flux<Task.Result> fixInconsistenciesInMessageId(Context context, RunningOptions runningOptions) {
-        return ReactorUtils.Throttler.forOperation(this::detectInconsistencyInMessageId)
-            .window(runningOptions.getMessagesPerSecond(), PERIOD)
-            .throttle(messageIdDAO.retrieveAllMessages())
-            .doOnNext(any -> context.incrementMessageIdEntries())
-            .flatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO));
+        return messageIdDAO.retrieveAllMessages()
+            .transform(ReactorUtils.<ComposedMessageIdWithMetaData, Task.Result>throttle()
+                .elements(runningOptions.getMessagesPerSecond())
+                .per(PERIOD)
+                .forOperation(metadata -> detectInconsistencyInMessageId(metadata)
+                    .doOnNext(any -> context.incrementMessageIdEntries())
+                    .flatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO))));
     }
 
     private Mono<Inconsistency> detectInconsistencyInMessageId(ComposedMessageIdWithMetaData message) {
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
index 124238c..34d1a95 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
@@ -279,10 +279,10 @@ public class ReIndexerPerformer {
     }
 
     private Mono<Task.Result> reIndexMessages(Flux<Either<Failure, ReIndexingEntry>> entriesToIndex, RunningOptions runningOptions, ReprocessingContext reprocessingContext) {
-        return ReactorUtils.Throttler.<Either<Failure, ReIndexingEntry>, Task.Result>forOperation(
-                entry -> reIndex(entry, reprocessingContext))
-            .window(runningOptions.getMessagesPerSecond(), Duration.ofSeconds(1))
-            .throttle(entriesToIndex)
+        return entriesToIndex.transform(ReactorUtils.<Either<Failure, ReIndexingEntry>, Task.Result>throttle()
+                .elements(runningOptions.getMessagesPerSecond())
+                .per(Duration.ofSeconds(1))
+                .forOperation(entry -> reIndex(entry, reprocessingContext)))
             .reduce(Task::combine)
             .switchIfEmpty(Mono.just(Result.COMPLETED));
     }
diff --git a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
index 57d7b72..ef49c23 100644
--- a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
+++ b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
@@ -47,7 +47,6 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class RecomputeCurrentQuotasService {
@@ -164,10 +163,11 @@ public class RecomputeCurrentQuotasService {
 
     public Mono<Task.Result> recomputeCurrentQuotas(Context context, RunningOptions runningOptions) {
         try {
-            Flux<Username> users = Iterators.toFlux(usersRepository.list());
-            return ReactorUtils.Throttler.<Username, Task.Result>forOperation(username -> recomputeUserCurrentQuotas(context, username))
-                .window(runningOptions.getUsersPerSecond(), Duration.ofSeconds(1))
-                .throttle(users)
+            return Iterators.toFlux(usersRepository.list())
+                .transform(ReactorUtils.<Username, Task.Result>throttle()
+                    .elements(runningOptions.getUsersPerSecond())
+                    .per(Duration.ofSeconds(1))
+                    .forOperation(username -> recomputeUserCurrentQuotas(context, username)))
                 .reduce(Task.Result.COMPLETED, Task::combine);
         } catch (UsersRepositoryException e) {
             LOGGER.error("Error while accessing users from repository", e);
diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index a7bed0e..1548d1d 100644
--- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -44,38 +44,35 @@ public class ReactorUtils {
 
     public static final String MDC_KEY_PREFIX = "MDC-";
 
-    public static class Throttler<T, U> {
-        private static final Duration DELAY = Duration.ZERO;
-
-        public static <T, U> RequiresWindowingParameters<T, U> forOperation(Function<T, Publisher<U>> operation) {
-            return (maxSize, duration) -> new Throttler<>(operation, maxSize, duration);
-        }
-
-        @FunctionalInterface
-        public interface RequiresWindowingParameters<T, U> {
-            Throttler<T, U> window(int maxSize, Duration duration);
-        }
-
-        private Throttler(Function<T, Publisher<U>> operation, int windowMaxSize, Duration windowDuration) {
-            Preconditions.checkArgument(windowMaxSize > 0, "'windowMaxSize' must be strictly positive");
-            Preconditions.checkArgument(!windowDuration.isNegative(), "'windowDuration' must be strictly positive");
-            Preconditions.checkArgument(!windowDuration.isZero(), "'windowDuration' must be strictly positive");
+    private static final Duration DELAY = Duration.ZERO;
+
+    public static <T, U> RequiresQuantity<T, U> throttle() {
+        return elements -> duration -> operation -> {
+            Preconditions.checkArgument(elements > 0, "'windowMaxSize' must be strictly positive");
+            Preconditions.checkArgument(!duration.isNegative(), "'windowDuration' must be strictly positive");
+            Preconditions.checkArgument(!duration.isZero(), "'windowDuration' must be strictly positive");
+
+            return flux -> flux
+                .windowTimeout(elements, duration)
+                .zipWith(Flux.interval(DELAY, duration))
+                .flatMap(Tuple2::getT1, elements, elements)
+                .flatMap(operation, elements);
+        };
+    }
 
-            this.operation = operation;
-            this.windowMaxSize = windowMaxSize;
-            this.windowDuration = windowDuration;
-        }
+    @FunctionalInterface
+    public interface RequiresQuantity<T, U> {
+        RequiresPeriod<T, U> elements(int maxSize);
+    }
 
-        private final Function<T, Publisher<U>> operation;
-        private final int windowMaxSize;
-        private final Duration windowDuration;
+    @FunctionalInterface
+    public interface RequiresPeriod<T, U> {
+        RequiresOperation<T, U> per(Duration duration);
+    }
 
-        public Flux<U> throttle(Flux<T> flux) {
-            return flux.windowTimeout(windowMaxSize, windowDuration)
-                .zipWith(Flux.interval(DELAY, windowDuration))
-                .flatMap(Tuple2::getT1)
-                .flatMap(operation, windowMaxSize);
-        }
+    @FunctionalInterface
+    public interface RequiresOperation<T, U> {
+        Function<Flux<T>, Flux<U>> forOperation(Function<T, Publisher<U>> operation);
     }
 
     public static <T> Mono<T> executeAndEmpty(Runnable runnable) {
diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
index 984c165..dc286ad 100644
--- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -19,6 +19,7 @@
 package org.apache.james.util;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.io.ByteArrayInputStream;
@@ -34,7 +35,6 @@ import java.util.function.Function;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.james.util.ReactorUtils.Throttler;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.reactivestreams.Publisher;
@@ -56,29 +56,37 @@ class ReactorUtilsTest {
     class Throttling {
         @Test
         void windowShouldThrowWhenMaxSizeIsNegative() {
-            assertThatThrownBy(() -> Throttler.forOperation(Mono::just)
-                    .window(-1, Duration.ofSeconds(1)))
+            assertThatThrownBy(() -> ReactorUtils.<Integer, Integer>throttle()
+                    .elements(-1)
+                    .per(Duration.ofSeconds(1))
+                    .forOperation(Mono::just))
                 .isInstanceOf(IllegalArgumentException.class);
         }
 
         @Test
         void windowShouldThrowWhenMaxSizeIsZero() {
-            assertThatThrownBy(() -> Throttler.forOperation(Mono::just)
-                    .window(0, Duration.ofSeconds(1)))
+            assertThatThrownBy(() -> ReactorUtils.throttle()
+                    .elements(0)
+                    .per(Duration.ofSeconds(1))
+                    .forOperation(Mono::just))
                 .isInstanceOf(IllegalArgumentException.class);
         }
 
         @Test
         void windowShouldThrowWhenDurationIsNegative() {
-            assertThatThrownBy(() -> Throttler.forOperation(Mono::just)
-                    .window(1, Duration.ofSeconds(-1)))
+            assertThatThrownBy(() -> ReactorUtils.throttle()
+                    .elements(1)
+                    .per(Duration.ofSeconds(-1))
+                    .forOperation(Mono::just))
                 .isInstanceOf(IllegalArgumentException.class);
         }
 
         @Test
         void windowShouldThrowWhenDurationIsZero() {
-            assertThatThrownBy(() -> Throttler.forOperation(Mono::just)
-                    .window(1, Duration.ZERO))
+            assertThatThrownBy(() -> ReactorUtils.throttle()
+                    .elements(1)
+                    .per(Duration.ofSeconds(0))
+                    .forOperation(Mono::just))
                 .isInstanceOf(IllegalArgumentException.class);
         }
 
@@ -89,13 +97,13 @@ class ReactorUtilsTest {
 
             Stopwatch stopwatch = Stopwatch.createUnstarted();
 
-            Flux<Integer> originalFlux = Flux.range(0, 10);
-            ImmutableList<Long> windowMembership = Throttler.<Integer, Long>forOperation(
-                    i -> Mono.fromCallable(() -> stopwatch.elapsed(TimeUnit.MILLISECONDS)))
-                .window(windowMaxSize, windowDuration)
-                .throttle(originalFlux)
-                .doOnSubscribe(signal -> stopwatch.start())
+            ImmutableList<Long> windowMembership = Flux.range(0, 10)
+                .transform(ReactorUtils.<Integer, Long>throttle()
+                    .elements(windowMaxSize)
+                    .per(windowDuration)
+                    .forOperation(i -> Mono.fromCallable(() -> stopwatch.elapsed(TimeUnit.MILLISECONDS))))
                 .map(i -> i / 100)
+                .doOnSubscribe(signal -> stopwatch.start())
                 .collect(Guavate.toImmutableList())
                 .block();
 
@@ -104,6 +112,22 @@ class ReactorUtilsTest {
         }
 
         @Test
+        void largeWindowShouldNotOverrunIntermediateBuffers() {
+            // windowMaxSize exceeds Queues.SMALL_BUFFER_SIZE & Queues.SMALL_BUFFER_SIZE (256 by default)
+            // Combined with slow operations, this ensures we are not filling up intermediate buffers.
+            int windowMaxSize = 3_000;
+            Duration windowDuration = Duration.ofMillis(100);
+
+            assertThatCode(() -> Flux.range(0, 10_000)
+                    .transform(ReactorUtils.<Integer, Long>throttle()
+                        .elements(windowMaxSize)
+                        .per(windowDuration)
+                        .forOperation(i -> Mono.delay(windowDuration.multipliedBy(2))))
+                    .blockLast())
+                .doesNotThrowAnyException();
+        }
+
+        @Test
         void throttleDownStreamConcurrencyShouldNotExceedWindowMaxSize() {
             int windowMaxSize = 3;
             Duration windowDuration = Duration.ofMillis(100);
@@ -116,9 +140,11 @@ class ReactorUtilsTest {
                     .flatMap(i -> Mono.delay(windowDuration.multipliedBy(2)).thenReturn(i))
                     .flatMap(i -> Mono.fromRunnable(ongoingProcessing::decrementAndGet).thenReturn(i));
 
-            ImmutableList<Integer> ongoingProcessingUponComputationStart = Throttler.forOperation(longRunningOperation)
-                .window(windowMaxSize, windowDuration)
-                .throttle(originalFlux)
+            ImmutableList<Integer> ongoingProcessingUponComputationStart = originalFlux
+                .transform(ReactorUtils.<Integer, Integer>throttle()
+                    .elements(windowMaxSize)
+                    .per(windowDuration)
+                    .forOperation(longRunningOperation))
                 .collect(Guavate.toImmutableList())
                 .block();
 
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
index f23e13c..b54aaaf 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
@@ -215,9 +215,10 @@ public class MessageFastViewProjectionCorrector {
     }
 
     private Mono<Result> correctProjection(Flux<ProjectionEntry> entries, RunningOptions runningOptions, Progress progress) {
-        return ReactorUtils.Throttler.<ProjectionEntry, Result>forOperation(entry -> correctProjection(entry, progress))
-            .window(runningOptions.getMessagesPerSecond(), PERIOD)
-            .throttle(entries)
+        return entries.transform(ReactorUtils.<ProjectionEntry, Task.Result>throttle()
+                .elements(runningOptions.getMessagesPerSecond())
+                .per(PERIOD)
+                .forOperation(entry -> correctProjection(entry, progress)))
             .reduce(Task::combine)
             .switchIfEmpty(Mono.just(Result.COMPLETED));
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org