You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/07/03 02:04:03 UTC

[james-project] 08/15: JAMES-3184 Throttling should work for long running streams

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit bddf322649f609ec7063121517d93eb684a8492a
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Jun 30 11:00:17 2020 +0700

    JAMES-3184 Throttling should work for long running streams
    
    As described in https://github.com/reactor/reactor-core/issues/1099
    windowTimeout does not play well with backpressure. When the number
    of window exceeds internal buffer size (32) the throttling crashed.
    
    We replaced "windowTimeout" with "window" and provides aditional
    tests covering when the upstream do not succeed to keep up with
    the throttling (try to cover regressions)
---
 server/container/util/pom.xml                      |   5 +
 .../java/org/apache/james/util/ReactorUtils.java   |   2 +-
 .../org/apache/james/util/ReactorUtilsTest.java    | 176 +++++++++++++++++++++
 3 files changed, 182 insertions(+), 1 deletion(-)

diff --git a/server/container/util/pom.xml b/server/container/util/pom.xml
index 4b650c6..ab545bb 100644
--- a/server/container/util/pom.xml
+++ b/server/container/util/pom.xml
@@ -80,6 +80,11 @@
             <artifactId>commons-lang3</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index 38e9908..92877ff 100644
--- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -54,7 +54,7 @@ public class ReactorUtils {
             Preconditions.checkArgument(!duration.isZero(), "'windowDuration' must be strictly positive");
 
             return flux -> flux
-                .windowTimeout(elements, duration)
+                .window(elements)
                 .zipWith(Flux.interval(DELAY, duration))
                 .flatMap(Tuple2::getT1, elements, elements)
                 .flatMap(operation, elements)
diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
index 58960c3..20f33c5 100644
--- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -21,6 +21,7 @@ package org.apache.james.util;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.awaitility.Duration.ONE_SECOND;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -29,12 +30,15 @@ import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.reactivestreams.Publisher;
@@ -203,6 +207,178 @@ class ReactorUtilsTest {
             assertThat(results)
                 .containsExactly(0, 1, 2, 3, 4, 6, 7, 8, 9);
         }
+
+        @Test
+        void throttleShouldHandleLargeFluxes() {
+            int windowMaxSize = 2;
+            Duration windowDuration = Duration.ofMillis(1);
+
+            Flux<Integer> originalFlux = Flux.range(0, 10000);
+
+            assertThatCode(() -> originalFlux
+                .transform(ReactorUtils.<Integer, Integer>throttle()
+                    .elements(windowMaxSize)
+                    .per(windowDuration)
+                    .forOperation(Mono::just))
+                .blockLast()).doesNotThrowAnyException();
+        }
+
+        @Test
+        void throttleShouldGenerateSmallerWindowsWhenUpstreamIsSlow() {
+            int windowMaxSize = 3;
+            Duration windowDuration = Duration.ofMillis(20);
+            Stopwatch stopwatch = Stopwatch.createUnstarted();
+
+            Flux<Long> originalFlux = Flux.interval(Duration.ofMillis(10));
+
+            ImmutableList<Long> perWindowCount = originalFlux
+                .transform(ReactorUtils.<Long, Long>throttle()
+                    .elements(windowMaxSize)
+                    .per(windowDuration)
+                    .forOperation(i -> Mono.fromCallable(() -> stopwatch.elapsed(TimeUnit.MILLISECONDS))))
+                .map(i -> i / 20)
+                .doOnSubscribe(signal -> stopwatch.start())
+                .take(10)
+                .groupBy(Function.identity())
+                .flatMap(Flux::count)
+                .collect(Guavate.toImmutableList())
+                .block();
+
+            // We verify that we generate 2 elements by slice and not 3
+            // (as the upstream cannot generate more than 2 element per window)
+            assertThat(perWindowCount)
+                .allSatisfy(count -> assertThat(count).isLessThanOrEqualTo(2));
+        }
+
+        @Test
+        void throttleShouldNotDropEntriesWhenUpstreamIsSlow() {
+            int windowMaxSize = 3;
+            Duration windowDuration = Duration.ofMillis(20);
+
+            Flux<Long> originalFlux = Flux.interval(Duration.ofMillis(10));
+
+            ImmutableList<Long> results = originalFlux
+                .transform(ReactorUtils.<Long, Long>throttle()
+                    .elements(windowMaxSize)
+                    .per(windowDuration)
+                    .forOperation(Mono::just))
+                .take(10)
+                .collect(Guavate.toImmutableList())
+                .block();
+
+            assertThat(results).containsExactly(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
+        }
+
+        @Test
+        void throttleShouldCompleteWhenOriginalFluxDoNotFillAWindow() {
+            int windowMaxSize = 3;
+            Duration windowDuration = Duration.ofMillis(20);
+
+            Flux<Long> originalFlux = Flux.just(0L, 1L);
+
+            ImmutableList<Long> results = originalFlux
+                .transform(ReactorUtils.<Long, Long>throttle()
+                    .elements(windowMaxSize)
+                    .per(windowDuration)
+                    .forOperation(Mono::just))
+                .take(10)
+                .collect(Guavate.toImmutableList())
+                .block();
+
+            assertThat(results).containsExactly(0L, 1L);
+        }
+
+        @Test
+        void throttleShouldSupportEmittingPartiallyCompleteWindowImmediately() {
+            int windowMaxSize = 3;
+            Duration windowDuration = Duration.ofMillis(20);
+
+            ConcurrentLinkedDeque<Long> results = new ConcurrentLinkedDeque<>();
+            Flux<Long> originalFlux = Flux.concat(Flux.just(0L, 1L),
+                Flux.never());
+
+            originalFlux
+                .transform(ReactorUtils.<Long, Long>throttle()
+                    .elements(windowMaxSize)
+                    .per(windowDuration)
+                    .forOperation(i -> {
+                        results.add(i);
+                        return Mono.just(i);
+                    }))
+                .subscribeOn(Schedulers.elastic())
+                .subscribe();
+
+            Awaitility.await().atMost(ONE_SECOND)
+                .untilAsserted(() -> assertThat(results).containsExactly(0L, 1L));
+        }
+
+        @Test
+        void throttleShouldTolerateSeveralEmptySlices() {
+            int windowMaxSize = 3;
+            Duration windowDuration = Duration.ofMillis(5);
+
+            // 150 ms = 30 * window duration (which is smaller than reactor small buffers)
+            Flux<Long> originalFlux = Flux.concat(Flux.just(0L, 1L),
+                Mono.delay(Duration.ofMillis(150)).thenReturn(2L));
+
+            List<Long> results = originalFlux
+                .transform(ReactorUtils.<Long, Long>throttle()
+                    .elements(windowMaxSize)
+                    .per(windowDuration)
+                    .forOperation(Mono::just))
+                .collectList()
+                .block();
+
+            System.out.println(results);
+            assertThat(results).containsExactly(0L, 1L, 2L);
+        }
+
+        @Test
+        void throttleShouldTolerateManyEmptySuccessiveWindows() {
+            Hooks.onOperatorDebug();
+
+            int windowMaxSize = 3;
+            Duration windowDuration = Duration.ofMillis(5);
+
+            // 150 ms = 33 * window duration (which is greater than reactor small buffers)
+            Flux<Long> originalFlux = Flux.concat(Flux.just(0L, 1L),
+                Mono.delay(Duration.ofMillis(165)).thenReturn(2L));
+
+            List<Long> results = originalFlux
+                .transform(ReactorUtils.<Long, Long>throttle()
+                    .elements(windowMaxSize)
+                    .per(windowDuration)
+                    .forOperation(Mono::just))
+                .collectList()
+                .block();
+
+            System.out.println(results);
+            assertThat(results).containsExactly(0L, 1L, 2L);
+        }
+
+        @Disabled("reactor.core.Exceptions$OverflowException: Could not emit tick 32 due to lack of requests (interval " +
+            "doesn't support small downstream requests that replenish slower than the ticks)")
+        @Test
+        void throttleShouldTolerateManyEmptyWindows() {
+            int windowMaxSize = 3;
+            Duration windowDuration = Duration.ofMillis(5);
+
+            // 150 ms = 30 * window duration (which is smaller than reactor small buffers)
+            Flux<Long> originalFlux = Flux.concat(Flux.just(0L, 1L),
+                Mono.delay(Duration.ofMillis(150)).thenReturn(2L),
+                Mono.delay(Duration.ofMillis(150)).thenReturn(3L));
+
+            List<Long> results = originalFlux
+                .transform(ReactorUtils.<Long, Long>throttle()
+                    .elements(windowMaxSize)
+                    .per(windowDuration)
+                    .forOperation(Mono::just))
+                .collectList()
+                .block();
+
+            System.out.println(results);
+            assertThat(results).containsExactly(0L, 1L, 2L, 3L);
+        }
     }
 
     @Nested


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org