You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2020/12/28 07:43:33 UTC

[james-project] 01/16: JAMES-3117 PeriodicalHealthChecks should not fail upon slow checks

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1e66405e5e7f24ca6eda3d6b01288ba4d1ed49c5
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Dec 25 09:00:20 2020 +0700

    JAMES-3117 PeriodicalHealthChecks should not fail upon slow checks
    
    If some checks exceeds the reporting period, we end up overrunning
    intermediate buffers.
    
    ```
    reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit tick 6784 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
    Caused by: reactor.core.Exceptions$OverflowException: Could not emit tick 6784 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:234)
    at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:130)
    at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
    at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.runAndReset(Unknown Source)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
    ```
    
    It looks safe to consider a check exceeding the reporting period
    as failed, thus preventing the buffer overrun.
---
 .../java/org/apache/james/PeriodicalHealthChecks.java    |  6 +++++-
 .../org/apache/james/PeriodicalHealthChecksTest.java     | 16 ++++++++++++++++
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java b/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java
index 0f9d1bf..e99d3d4 100644
--- a/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java
+++ b/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java
@@ -22,6 +22,7 @@ package org.apache.james;
 import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
 import java.util.Set;
+import java.util.concurrent.TimeoutException;
 
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
@@ -62,7 +63,10 @@ public class PeriodicalHealthChecks implements Startable {
     public void start() {
         disposable = Flux.interval(configuration.getPeriod(), scheduler)
             .flatMapIterable(any -> healthChecks)
-            .flatMap(healthCheck -> Mono.from(healthCheck.check()), DEFAULT_CONCURRENCY)
+            .flatMap(healthCheck -> Mono.from(healthCheck.check())
+                .timeout(configuration.getPeriod())
+                .onErrorResume(TimeoutException.class, e -> Mono.just(Result.unhealthy(healthCheck.componentName(), e.getMessage()))),
+                DEFAULT_CONCURRENCY)
             .doOnNext(this::logResult)
             .onErrorContinue(this::logError)
             .subscribeOn(Schedulers.elastic())
diff --git a/server/container/guice/guice-common/src/test/java/org/apache/james/PeriodicalHealthChecksTest.java b/server/container/guice/guice-common/src/test/java/org/apache/james/PeriodicalHealthChecksTest.java
index a325098..98da701 100644
--- a/server/container/guice/guice-common/src/test/java/org/apache/james/PeriodicalHealthChecksTest.java
+++ b/server/container/guice/guice-common/src/test/java/org/apache/james/PeriodicalHealthChecksTest.java
@@ -21,6 +21,7 @@ package org.apache.james;
 
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -100,6 +101,21 @@ public class PeriodicalHealthChecksTest {
     }
 
     @Test
+    void healthChecksShouldBeConsideredFailedIfExceedingTimeout() {
+        testee = new PeriodicalHealthChecks(ImmutableSet.of(mockHealthCheck1, mockHealthCheck2),
+            scheduler,
+            new PeriodicalHealthChecksConfiguration(Duration.ofMillis(1)));
+
+        when(mockHealthCheck1.check()).thenReturn(Mono.just(Result.healthy(new ComponentName("mockHealthCheck1"))).delayElement(Duration.ofMillis(10)));
+        when(mockHealthCheck2.check()).thenReturn(Mono.just(Result.healthy(new ComponentName("mockHealthCheck2"))).delayElement(Duration.ofMillis(10)));
+
+        testee.start();
+
+        assertThatCode(() -> scheduler.advanceTimeBy(PERIOD))
+            .doesNotThrowAnyException();
+    }
+
+    @Test
     void startShouldCallHealthCheckAtLeastOnce() {
         testee.start();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org