You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/04/19 01:25:36 UTC

[james-project] branch master updated: JAMES-3900 Ignore errors upon task manager polling updates (#1523)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new bc88192963 JAMES-3900 Ignore errors upon task manager polling updates (#1523)
bc88192963 is described below

commit bc8819296357ccbd7c253226b4bd568996771a80
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Wed Apr 19 08:25:30 2023 +0700

    JAMES-3900 Ignore errors upon task manager polling updates (#1523)
---
 .../apache/james/task/SerialTaskManagerWorker.java |  7 +++++-
 .../james/task/SerialTaskManagerWorkerTest.java    | 28 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 1 deletion(-)

diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index 66b114c870..4b3f8c5e33 100644
--- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -123,7 +123,12 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
             .delayElement(pollingInterval, Schedulers.parallel())
             .repeat()
             .handle(publishIfPresent())
-            .flatMap(information -> Mono.from(listener.updated(taskWithId.getId(), Mono.just(information))).thenReturn(information), DEFAULT_CONCURRENCY);
+            .flatMap(information -> Mono.from(listener.updated(taskWithId.getId(), Mono.just(information)))
+                .thenReturn(information)
+                .onErrorResume(e -> {
+                    LOGGER.error("Error upon polling additional information updates", e);
+                    return Mono.empty();
+                }), DEFAULT_CONCURRENCY);
     }
 
 
diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
index eaad5c0764..2fdbb72695 100644
--- a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
+++ b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
@@ -23,6 +23,7 @@ import static org.awaitility.Durations.TEN_SECONDS;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.notNull;
+import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.atMost;
 import static org.mockito.Mockito.mock;
@@ -40,6 +41,8 @@ import org.awaitility.Awaitility;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.reactivestreams.Publisher;
 
 import reactor.core.publisher.Flux;
@@ -127,6 +130,31 @@ class SerialTaskManagerWorkerTest {
         verify(listener, atMost(3)).updated(eq(taskWithId.getId()), notNull());
     }
 
+    @Test
+    void errorUponUpdatesShouldNotAbortTheRunningTaskPolledUpdates() {
+        AtomicInteger updatedCounter = new AtomicInteger(0);
+        when(listener.updated(any(), any())).thenAnswer(new Answer<Mono<Void>>() {
+            @Override
+            public Mono<Void> answer(InvocationOnMock invocationOnMock) {
+                if (updatedCounter.getAndIncrement() == 1) {
+                    return Mono.error(new RuntimeException());
+                }
+                return Mono.empty();
+            }
+        });
+
+        TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new MemoryReferenceWithCounterTask((counter) ->
+            Mono.fromCallable(counter::incrementAndGet)
+                .delayElement(Duration.ofMillis(1))
+                .repeat(600)
+                .then(Mono.just(Task.Result.COMPLETED))
+                .block()));
+
+        worker.executeTask(taskWithId).block();
+
+        verify(listener, atLeast(3)).updated(eq(taskWithId.getId()), notNull());
+    }
+
     @Test
     void aFailedTaskShouldCompleteWithFailedStatus() {
         ArgumentCaptor<Publisher<Optional<TaskExecutionDetails.AdditionalInformation>>> additionalInformationPublisherCapture = ArgumentCaptor.forClass(Publisher.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org