You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/07/03 02:04:05 UTC

[james-project] 10/15: JAMES-3184 Throttling should survive errors

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 6a8e0d402d3d91727bc34c2bd2719b3aeab0a87b
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Jun 30 15:29:44 2020 +0700

    JAMES-3184 Throttling should survive errors
    
    fixup! JAMES-3184 Throttling should survive errors
---
 .../main/java/org/apache/james/util/ReactorUtils.java |  8 ++++++--
 .../java/org/apache/james/util/ReactorUtilsTest.java  | 19 +++++++++++++++++++
 2 files changed, 25 insertions(+), 2 deletions(-)

diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index 5ec7b9d..b516b8e 100644
--- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -52,10 +52,14 @@ public class ReactorUtils {
             Preconditions.checkArgument(!duration.isZero(), "'windowDuration' must be strictly positive");
 
             return flux -> flux
+                .onErrorContinue((e, o) -> LOGGER.error("Error encountered while generating throttled entries", e))
                 .window(elements)
                 .delayElements(duration)
-                .concatMap(window -> window.flatMap(operation))
-                .onErrorContinue((e, o) -> LOGGER.error("Error encountered while throttling for {}", o.toString(), e));
+                .concatMap(window -> window.flatMap(operation)
+                    .onErrorResume(e -> {
+                        LOGGER.error("Error encountered while throttling", e);
+                        return Mono.empty();
+                    }));
         };
     }
 
diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
index 0ffbc1a..f7fc589 100644
--- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -209,6 +209,25 @@ class ReactorUtilsTest {
         }
 
         @Test
+        void throttleShouldNotOverwriteErrorHandling() {
+            int windowMaxSize = 3;
+            Duration windowDuration = Duration.ofMillis(20);
+
+            Flux<Long> originalFlux = Flux.just(0L);
+            ConcurrentLinkedDeque<Throwable> recordedExceptions = new ConcurrentLinkedDeque<>();
+
+            originalFlux
+                .transform(ReactorUtils.<Long, Long>throttle()
+                    .elements(windowMaxSize)
+                    .per(windowDuration)
+                    .forOperation(any -> Mono.<Long>error(new RuntimeException())
+                        .onErrorResume(e -> Mono.fromRunnable(() -> recordedExceptions.add(e)).thenReturn(any))))
+                .blockLast();
+
+            assertThat(recordedExceptions).hasSize(1);
+        }
+
+        @Test
         void throttleShouldHandleLargeFluxes() {
             int windowMaxSize = 2;
             Duration windowDuration = Duration.ofMillis(1);


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org