You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2020/12/28 07:43:33 UTC
[james-project] 01/16: JAMES-3117 PeriodicalHealthChecks should not
fail upon slow checks
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 1e66405e5e7f24ca6eda3d6b01288ba4d1ed49c5
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Dec 25 09:00:20 2020 +0700
JAMES-3117 PeriodicalHealthChecks should not fail upon slow checks
If some checks exceeds the reporting period, we end up overrunning
intermediate buffers.
```
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit tick 6784 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
Caused by: reactor.core.Exceptions$OverflowException: Could not emit tick 6784 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:234)
at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:130)
at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.runAndReset(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
```
It looks safe to consider a check exceeding the reporting period
as failed, thus preventing the buffer overrun.
---
.../java/org/apache/james/PeriodicalHealthChecks.java | 6 +++++-
.../org/apache/james/PeriodicalHealthChecksTest.java | 16 ++++++++++++++++
2 files changed, 21 insertions(+), 1 deletion(-)
diff --git a/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java b/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java
index 0f9d1bf..e99d3d4 100644
--- a/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java
+++ b/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java
@@ -22,6 +22,7 @@ package org.apache.james;
import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
import java.util.Set;
+import java.util.concurrent.TimeoutException;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
@@ -62,7 +63,10 @@ public class PeriodicalHealthChecks implements Startable {
public void start() {
disposable = Flux.interval(configuration.getPeriod(), scheduler)
.flatMapIterable(any -> healthChecks)
- .flatMap(healthCheck -> Mono.from(healthCheck.check()), DEFAULT_CONCURRENCY)
+ .flatMap(healthCheck -> Mono.from(healthCheck.check())
+ .timeout(configuration.getPeriod())
+ .onErrorResume(TimeoutException.class, e -> Mono.just(Result.unhealthy(healthCheck.componentName(), e.getMessage()))),
+ DEFAULT_CONCURRENCY)
.doOnNext(this::logResult)
.onErrorContinue(this::logError)
.subscribeOn(Schedulers.elastic())
diff --git a/server/container/guice/guice-common/src/test/java/org/apache/james/PeriodicalHealthChecksTest.java b/server/container/guice/guice-common/src/test/java/org/apache/james/PeriodicalHealthChecksTest.java
index a325098..98da701 100644
--- a/server/container/guice/guice-common/src/test/java/org/apache/james/PeriodicalHealthChecksTest.java
+++ b/server/container/guice/guice-common/src/test/java/org/apache/james/PeriodicalHealthChecksTest.java
@@ -21,6 +21,7 @@ package org.apache.james;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -100,6 +101,21 @@ public class PeriodicalHealthChecksTest {
}
@Test
+ void healthChecksShouldBeConsideredFailedIfExceedingTimeout() {
+ testee = new PeriodicalHealthChecks(ImmutableSet.of(mockHealthCheck1, mockHealthCheck2),
+ scheduler,
+ new PeriodicalHealthChecksConfiguration(Duration.ofMillis(1)));
+
+ when(mockHealthCheck1.check()).thenReturn(Mono.just(Result.healthy(new ComponentName("mockHealthCheck1"))).delayElement(Duration.ofMillis(10)));
+ when(mockHealthCheck2.check()).thenReturn(Mono.just(Result.healthy(new ComponentName("mockHealthCheck2"))).delayElement(Duration.ofMillis(10)));
+
+ testee.start();
+
+ assertThatCode(() -> scheduler.advanceTimeBy(PERIOD))
+ .doesNotThrowAnyException();
+ }
+
+ @Test
void startShouldCallHealthCheckAtLeastOnce() {
testee.start();
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org