You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/07/03 02:04:02 UTC

[james-project] 07/15: JAMES-3184 Throttling should survive errors

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9f514be644512549f08300f344769499bd1ede82
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Jun 30 10:50:00 2020 +0700

    JAMES-3184 Throttling should survive errors
---
 .../java/org/apache/james/util/ReactorUtils.java   |  8 ++--
 .../org/apache/james/util/ReactorUtilsTest.java    | 52 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 3 deletions(-)

diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index 1548d1d..38e9908 100644
--- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -30,6 +30,8 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 
 import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
@@ -41,9 +43,8 @@ import reactor.util.context.Context;
 import reactor.util.function.Tuple2;
 
 public class ReactorUtils {
-
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReactorUtils.class);
     public static final String MDC_KEY_PREFIX = "MDC-";
-
     private static final Duration DELAY = Duration.ZERO;
 
     public static <T, U> RequiresQuantity<T, U> throttle() {
@@ -56,7 +57,8 @@ public class ReactorUtils {
                 .windowTimeout(elements, duration)
                 .zipWith(Flux.interval(DELAY, duration))
                 .flatMap(Tuple2::getT1, elements, elements)
-                .flatMap(operation, elements);
+                .flatMap(operation, elements)
+                .onErrorContinue((e, o) -> LOGGER.error("Error encountered while throttling for {}", o.toString(), e));
         };
     }
 
diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
index dc286ad..58960c3 100644
--- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -46,6 +46,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.primitives.Bytes;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Hooks;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
@@ -151,6 +152,57 @@ class ReactorUtilsTest {
             assertThat(ongoingProcessingUponComputationStart)
                 .allSatisfy(processingCount -> assertThat(processingCount).isLessThanOrEqualTo(windowMaxSize));
         }
+
+        @Test
+        void throttleShouldNotAbortProcessingUponError() {
+            int windowMaxSize = 3;
+            Duration windowDuration = Duration.ofMillis(100);
+
+            Flux<Integer> originalFlux = Flux.range(0, 10);
+            Function<Integer, Publisher<Integer>> operation =
+                i -> {
+                    if (i == 5) {
+                        return Mono.error(new RuntimeException());
+                    }
+                    return Mono.just(i);
+                };
+
+            List<Integer> results = originalFlux
+                .transform(ReactorUtils.<Integer, Integer>throttle()
+                    .elements(windowMaxSize)
+                    .per(windowDuration)
+                    .forOperation(operation))
+                .collectList()
+                .block();
+
+            assertThat(results)
+                .containsExactly(0, 1, 2, 3, 4, 6, 7, 8, 9);
+        }
+
+        @Test
+        void throttleShouldNotAbortProcessingUponUpstreamError() {
+            int windowMaxSize = 3;
+            Duration windowDuration = Duration.ofMillis(100);
+
+            Flux<Integer> originalFlux = Flux.range(0, 10)
+                .flatMap(i -> {
+                    if (i == 5) {
+                        return Mono.error(new RuntimeException());
+                    }
+                    return Mono.just(i);
+                });
+
+            List<Integer> results = originalFlux
+                .transform(ReactorUtils.<Integer, Integer>throttle()
+                    .elements(windowMaxSize)
+                    .per(windowDuration)
+                    .forOperation(Mono::just))
+                .collectList()
+                .block();
+
+            assertThat(results)
+                .containsExactly(0, 1, 2, 3, 4, 6, 7, 8, 9);
+        }
     }
 
     @Nested


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org