You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/04/19 01:25:36 UTC
[james-project] branch master updated: JAMES-3900 Ignore errors upon task manager polling updates (#1523)
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new bc88192963 JAMES-3900 Ignore errors upon task manager polling updates (#1523)
bc88192963 is described below
commit bc8819296357ccbd7c253226b4bd568996771a80
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Wed Apr 19 08:25:30 2023 +0700
JAMES-3900 Ignore errors upon task manager polling updates (#1523)
---
.../apache/james/task/SerialTaskManagerWorker.java | 7 +++++-
.../james/task/SerialTaskManagerWorkerTest.java | 28 ++++++++++++++++++++++
2 files changed, 34 insertions(+), 1 deletion(-)
diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index 66b114c870..4b3f8c5e33 100644
--- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -123,7 +123,12 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
.delayElement(pollingInterval, Schedulers.parallel())
.repeat()
.handle(publishIfPresent())
- .flatMap(information -> Mono.from(listener.updated(taskWithId.getId(), Mono.just(information))).thenReturn(information), DEFAULT_CONCURRENCY);
+ .flatMap(information -> Mono.from(listener.updated(taskWithId.getId(), Mono.just(information)))
+ .thenReturn(information)
+ .onErrorResume(e -> {
+ LOGGER.error("Error upon polling additional information updates", e);
+ return Mono.empty();
+ }), DEFAULT_CONCURRENCY);
}
diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
index eaad5c0764..2fdbb72695 100644
--- a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
+++ b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
@@ -23,6 +23,7 @@ import static org.awaitility.Durations.TEN_SECONDS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.notNull;
+import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.mock;
@@ -40,6 +41,8 @@ import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
@@ -127,6 +130,31 @@ class SerialTaskManagerWorkerTest {
verify(listener, atMost(3)).updated(eq(taskWithId.getId()), notNull());
}
+ @Test
+ void errorUponUpdatesShouldNotAbortTheRunningTaskPolledUpdates() {
+ AtomicInteger updatedCounter = new AtomicInteger(0);
+ when(listener.updated(any(), any())).thenAnswer(new Answer<Mono<Void>>() {
+ @Override
+ public Mono<Void> answer(InvocationOnMock invocationOnMock) {
+ if (updatedCounter.getAndIncrement() == 1) {
+ return Mono.error(new RuntimeException());
+ }
+ return Mono.empty();
+ }
+ });
+
+ TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new MemoryReferenceWithCounterTask((counter) ->
+ Mono.fromCallable(counter::incrementAndGet)
+ .delayElement(Duration.ofMillis(1))
+ .repeat(600)
+ .then(Mono.just(Task.Result.COMPLETED))
+ .block()));
+
+ worker.executeTask(taskWithId).block();
+
+ verify(listener, atLeast(3)).updated(eq(taskWithId.getId()), notNull());
+ }
+
@Test
void aFailedTaskShouldCompleteWithFailedStatus() {
ArgumentCaptor<Publisher<Optional<TaskExecutionDetails.AdditionalInformation>>> additionalInformationPublisherCapture = ArgumentCaptor.forClass(Publisher.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org