You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/05/13 12:07:17 UTC

[james-project] 08/15: JAMES-3172 We cannot cancel computation started by Reactor

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit dc46a4497d0b9b7c1be16b476dafa347c921ff40
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu May 7 13:31:06 2020 +0700

    JAMES-3172 We cannot cancel computation started by Reactor
---
 .../james/task/SerialTaskManagerWorkerTest.java    | 37 +++++++++++++++++++++-
 1 file changed, 36 insertions(+), 1 deletion(-)

diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
index 5c4f23d..0065546 100644
--- a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
+++ b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
@@ -25,7 +25,6 @@ import static org.mockito.ArgumentMatchers.notNull;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.atMost;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
@@ -40,8 +39,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 class SerialTaskManagerWorkerTest {
@@ -169,6 +170,40 @@ class SerialTaskManagerWorkerTest {
         latch.countDown();
     }
 
+    @Disabled("JAMES-3172 We cannot cancel computation started by Reactor")
+    @Test
+    void taskExecutingReactivelyShouldStopExecutionUponCancel() throws InterruptedException {
+        // Provide a task ticking every 100ms in a separate reactor thread
+        AtomicInteger tickCount = new AtomicInteger();
+        int tikIntervalInMs = 100;
+        MemoryReferenceTask tickTask = new MemoryReferenceTask(() -> Flux.interval(Duration.ofMillis(tikIntervalInMs))
+            .flatMap(any -> Mono.fromCallable(() -> {
+                tickCount.incrementAndGet();
+                return Task.Result.COMPLETED;
+            }))
+            .reduce(Task::combine)
+            .thenReturn(Task.Result.COMPLETED)
+            .block());
+
+        // Execute the task
+        TaskId id = TaskId.generateTaskId();
+        TaskWithId taskWithId = new TaskWithId(id, tickTask);
+        Mono<Task.Result> resultMono = worker.executeTask(taskWithId).cache();
+        resultMono.subscribe();
+        Awaitility.waitAtMost(org.awaitility.Duration.TEN_SECONDS)
+            .untilAsserted(() -> verify(listener, atLeastOnce()).started(id));
+
+        worker.cancelTask(id);
+
+        Thread.sleep(tikIntervalInMs);
+
+        int tikCountSnapshot1 = tickCount.get();
+        Thread.sleep(2 * tikIntervalInMs);
+        int tikCountSnapshot2 = tickCount.get();
+        // If the task had effectively been canceled tikCount should no longer be incremented
+        assertThat(tikCountSnapshot1).isEqualTo(tikCountSnapshot2);
+    }
+
     @Test
     void theWorkerShouldCancelAnInProgressTask() throws InterruptedException {
         TaskId id = TaskId.generateTaskId();


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org