You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/05/13 12:07:17 UTC
[james-project] 08/15: JAMES-3172 We cannot cancel computation
started by Reactor
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit dc46a4497d0b9b7c1be16b476dafa347c921ff40
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu May 7 13:31:06 2020 +0700
JAMES-3172 We cannot cancel computation started by Reactor
---
.../james/task/SerialTaskManagerWorkerTest.java | 37 +++++++++++++++++++++-
1 file changed, 36 insertions(+), 1 deletion(-)
diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
index 5c4f23d..0065546 100644
--- a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
+++ b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
@@ -25,7 +25,6 @@ import static org.mockito.ArgumentMatchers.notNull;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@@ -40,8 +39,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
class SerialTaskManagerWorkerTest {
@@ -169,6 +170,40 @@ class SerialTaskManagerWorkerTest {
latch.countDown();
}
+ @Disabled("JAMES-3172 We cannot cancel computation started by Reactor")
+ @Test
+ void taskExecutingReactivelyShouldStopExecutionUponCancel() throws InterruptedException {
+ // Provide a task ticking every 100ms in a separate reactor thread
+ AtomicInteger tickCount = new AtomicInteger();
+ int tikIntervalInMs = 100;
+ MemoryReferenceTask tickTask = new MemoryReferenceTask(() -> Flux.interval(Duration.ofMillis(tikIntervalInMs))
+ .flatMap(any -> Mono.fromCallable(() -> {
+ tickCount.incrementAndGet();
+ return Task.Result.COMPLETED;
+ }))
+ .reduce(Task::combine)
+ .thenReturn(Task.Result.COMPLETED)
+ .block());
+
+ // Execute the task
+ TaskId id = TaskId.generateTaskId();
+ TaskWithId taskWithId = new TaskWithId(id, tickTask);
+ Mono<Task.Result> resultMono = worker.executeTask(taskWithId).cache();
+ resultMono.subscribe();
+ Awaitility.waitAtMost(org.awaitility.Duration.TEN_SECONDS)
+ .untilAsserted(() -> verify(listener, atLeastOnce()).started(id));
+
+ worker.cancelTask(id);
+
+ Thread.sleep(tikIntervalInMs);
+
+ int tikCountSnapshot1 = tickCount.get();
+ Thread.sleep(2 * tikIntervalInMs);
+ int tikCountSnapshot2 = tickCount.get();
+ // If the task had effectively been canceled tikCount should no longer be incremented
+ assertThat(tikCountSnapshot1).isEqualTo(tikCountSnapshot2);
+ }
+
@Test
void theWorkerShouldCancelAnInProgressTask() throws InterruptedException {
TaskId id = TaskId.generateTaskId();
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org