You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/07/03 02:04:04 UTC

[james-project] 09/15: JAMES-3184 Throttling should tolerate many empty windows

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f7774766844301d89e3e31ba469c2a4af4899a8d
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Jun 30 15:29:18 2020 +0700

    JAMES-3184 Throttling should tolerate many empty windows
    
    Previous version failed in zipWith with this error:
    
    ```
    reactor.core.Exceptions$OverflowException: Could not emit tick 32 due
    to lack of requests (interval doesn't support small downstream requests
    that replenish slower than the ticks)
    ```
    
    We avoid "hard" failure, but we no longer have a timeout on the original
    timeout. Which sounds like an acceptable tradeoff.
---
 .../src/main/java/org/apache/james/util/ReactorUtils.java   |  7 ++-----
 .../test/java/org/apache/james/util/ReactorUtilsTest.java   | 13 ++++++-------
 2 files changed, 8 insertions(+), 12 deletions(-)

diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index 92877ff..5ec7b9d 100644
--- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -40,12 +40,10 @@ import reactor.core.publisher.Mono;
 import reactor.core.publisher.Signal;
 import reactor.core.publisher.SynchronousSink;
 import reactor.util.context.Context;
-import reactor.util.function.Tuple2;
 
 public class ReactorUtils {
     private static final Logger LOGGER = LoggerFactory.getLogger(ReactorUtils.class);
     public static final String MDC_KEY_PREFIX = "MDC-";
-    private static final Duration DELAY = Duration.ZERO;
 
     public static <T, U> RequiresQuantity<T, U> throttle() {
         return elements -> duration -> operation -> {
@@ -55,9 +53,8 @@ public class ReactorUtils {
 
             return flux -> flux
                 .window(elements)
-                .zipWith(Flux.interval(DELAY, duration))
-                .flatMap(Tuple2::getT1, elements, elements)
-                .flatMap(operation, elements)
+                .delayElements(duration)
+                .concatMap(window -> window.flatMap(operation))
                 .onErrorContinue((e, o) -> LOGGER.error("Error encountered while throttling for {}", o.toString(), e));
         };
     }
diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
index 20f33c5..0ffbc1a 100644
--- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -50,7 +50,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.primitives.Bytes;
 
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.Hooks;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
@@ -112,8 +111,9 @@ class ReactorUtilsTest {
                 .collect(Guavate.toImmutableList())
                 .block();
 
+            // delayElements also delay the first element
             assertThat(windowMembership)
-                .containsExactly(0L, 0L, 0L, 1L, 1L, 1L, 2L, 2L, 2L, 3L);
+                .containsExactly(1L, 1L, 1L, 2L, 2L, 2L, 3L, 3L, 3L, 4L);
         }
 
         @Test
@@ -223,6 +223,9 @@ class ReactorUtilsTest {
                 .blockLast()).doesNotThrowAnyException();
         }
 
+        @Disabled("We no longer rely on 'windowTimeout', this breakage is expected." +
+            "'windowTimeout' solves this but create other, more critical issues (large flux cannot be throttled" +
+            "as described in https://github.com/reactor/reactor-core/issues/1099")
         @Test
         void throttleShouldGenerateSmallerWindowsWhenUpstreamIsSlow() {
             int windowMaxSize = 3;
@@ -270,7 +273,7 @@ class ReactorUtilsTest {
         }
 
         @Test
-        void throttleShouldCompleteWhenOriginalFluxDoNotFillAWindow() {
+        void throttleShouldCompleteWhenOriginalFluxDoesNotFillAWindow() {
             int windowMaxSize = 3;
             Duration windowDuration = Duration.ofMillis(20);
 
@@ -335,8 +338,6 @@ class ReactorUtilsTest {
 
         @Test
         void throttleShouldTolerateManyEmptySuccessiveWindows() {
-            Hooks.onOperatorDebug();
-
             int windowMaxSize = 3;
             Duration windowDuration = Duration.ofMillis(5);
 
@@ -356,8 +357,6 @@ class ReactorUtilsTest {
             assertThat(results).containsExactly(0L, 1L, 2L);
         }
 
-        @Disabled("reactor.core.Exceptions$OverflowException: Could not emit tick 32 due to lack of requests (interval " +
-            "doesn't support small downstream requests that replenish slower than the ticks)")
         @Test
         void throttleShouldTolerateManyEmptyWindows() {
             int windowMaxSize = 3;


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org