You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/10/18 06:40:46 UTC

[james-project] 13/17: JAMES-2813 change polling intervall and make it a parameter of the worker

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e1ff8c54a13a4609c1d42cae9c001da2e1ef36c0
Author: RĂ©mi KOWALSKI <rk...@linagora.com>
AuthorDate: Thu Oct 17 10:55:50 2019 +0200

    JAMES-2813 change polling intervall and make it a parameter of the worker
---
 .../java/org/apache/james/task/TaskManagerContract.java     |  2 +-
 .../distributed/RabbitMQWorkQueueSupplier.scala             | 13 +++++++++++--
 .../distributed/DistributedTaskManagerTest.java             |  3 +--
 .../main/java/org/apache/james/task/MemoryTaskManager.java  |  3 ++-
 .../java/org/apache/james/task/SerialTaskManagerWorker.java |  6 ++++--
 .../org/apache/james/task/SerialTaskManagerWorkerTest.java  |  8 +++++---
 .../task/eventsourcing/EventSourcingTaskManagerTest.java    |  2 +-
 7 files changed, 25 insertions(+), 12 deletions(-)

diff --git a/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java b/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
index d482eed..406671f 100644
--- a/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
+++ b/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
@@ -38,7 +38,7 @@ import org.hamcrest.Matchers;
 import org.junit.jupiter.api.Test;
 
 public interface TaskManagerContract {
-
+    java.time.Duration UPDATE_INFORMATION_POLLING_INTERVAL = java.time.Duration.ofSeconds(1);
     Duration slowPacedPollInterval = ONE_HUNDRED_MILLISECONDS;
     ConditionFactory calmlyAwait = Awaitility.with()
         .pollInterval(slowPacedPollInterval)
diff --git a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
index a6c468c..f86be44 100644
--- a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
+++ b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
@@ -18,8 +18,10 @@
  * ***************************************************************/
 package org.apache.james.task.eventsourcing.distributed
 
-import javax.inject.Inject
+import java.time.Duration
 
+import com.google.common.annotations.VisibleForTesting
+import javax.inject.Inject
 import org.apache.james.backends.rabbitmq.SimpleConnectionPool
 import org.apache.james.eventsourcing.EventSourcingSystem
 import org.apache.james.server.task.json.JsonTaskSerializer
@@ -28,9 +30,16 @@ import org.apache.james.task.eventsourcing.{WorkQueueSupplier, WorkerStatusListe
 
 class RabbitMQWorkQueueSupplier @Inject()(private val rabbitMQConnectionPool: SimpleConnectionPool,
                                 private val jsonTaskSerializer: JsonTaskSerializer) extends WorkQueueSupplier {
+
+  val DEFAULT_ADDITIONAL_INFORMATION_POLLING_INTERVAL =  Duration.ofSeconds(30)
   override def apply(eventSourcingSystem: EventSourcingSystem): RabbitMQWorkQueue = {
+     apply(eventSourcingSystem, DEFAULT_ADDITIONAL_INFORMATION_POLLING_INTERVAL)
+  }
+
+  @VisibleForTesting
+  def apply(eventSourcingSystem: EventSourcingSystem, additionalInformationPollingInterval: Duration): RabbitMQWorkQueue = {
     val listener = WorkerStatusListener(eventSourcingSystem)
-    val worker = new SerialTaskManagerWorker(listener)
+    val worker = new SerialTaskManagerWorker(listener, additionalInformationPollingInterval)
     val rabbitMQWorkQueue = new RabbitMQWorkQueue(worker, rabbitMQConnectionPool, jsonTaskSerializer)
     rabbitMQWorkQueue.start()
     rabbitMQWorkQueue
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index 6774b67..3f4be63 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -68,7 +68,6 @@ import org.awaitility.Duration;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import com.github.steveash.guavate.Guavate;
@@ -86,7 +85,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
 
         @Override
         public WorkQueue apply(EventSourcingSystem eventSourcingSystem) {
-            RabbitMQWorkQueue workQueue = supplier.apply(eventSourcingSystem);
+            RabbitMQWorkQueue workQueue = supplier.apply(eventSourcingSystem, UPDATE_INFORMATION_POLLING_INTERVAL);
             workQueues.add(workQueue);
             return workQueue;
         }
diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java
index 56ffd2d..64f60e9 100644
--- a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -93,6 +93,7 @@ public class MemoryTaskManager implements TaskManager {
         }
     }
 
+    private static final Duration UPDATE_INFORMATION_POLLING_DURATION = Duration.ofSeconds(5);
     private static final Duration AWAIT_POLLING_DURATION = Duration.ofMillis(500);
     public static final Duration NOW = Duration.ZERO;
 
@@ -105,7 +106,7 @@ public class MemoryTaskManager implements TaskManager {
     public MemoryTaskManager(Hostname hostname) {
         this.hostname = hostname;
         this.idToExecutionDetails = new ConcurrentHashMap<>();
-        this.worker = new SerialTaskManagerWorker(updater());
+        this.worker = new SerialTaskManagerWorker(updater(), UPDATE_INFORMATION_POLLING_DURATION);
         workQueue = new MemoryWorkQueue(worker);
     }
 
diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index c296bf1..34f4548 100644
--- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -52,8 +52,10 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
     private final AtomicReference<Tuple2<TaskId, Future<?>>> runningTask;
     private final Semaphore semaphore;
     private final Set<TaskId> cancelledTasks;
+    private final Duration pollingInterval;
 
-    public SerialTaskManagerWorker(Listener listener) {
+    public SerialTaskManagerWorker(Listener listener, Duration pollingInterval) {
+        this.pollingInterval = pollingInterval;
         this.taskExecutor = Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor"));
         this.listener = listener;
         this.cancelledTasks = Sets.newConcurrentHashSet();
@@ -112,7 +114,7 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
 
     private Flux<TaskExecutionDetails.AdditionalInformation> pollAdditionalInformation(TaskWithId taskWithId) {
         return Mono.fromCallable(() -> taskWithId.getTask().details())
-            .delayElement(Duration.ofSeconds(1), Schedulers.boundedElastic())
+            .delayElement(pollingInterval, Schedulers.boundedElastic())
             .repeat()
             .flatMap(Mono::justOrEmpty)
             .doOnNext(information -> listener.updated(taskWithId.getId(), information));
diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
index b693347..a83392f 100644
--- a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
+++ b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
@@ -44,6 +44,8 @@ import org.junit.jupiter.api.Test;
 import reactor.core.publisher.Mono;
 
 class SerialTaskManagerWorkerTest {
+    private  static final Duration UPDATE_INFORMATION_POLLING_DURATION = Duration.ofSeconds(1);
+
     private TaskManagerWorker.Listener listener;
     private SerialTaskManagerWorker worker;
 
@@ -54,7 +56,7 @@ class SerialTaskManagerWorkerTest {
     @BeforeEach
     void beforeEach() {
         listener = mock(TaskManagerWorker.Listener.class);
-        worker = new SerialTaskManagerWorker(listener);
+        worker = new SerialTaskManagerWorker(listener, UPDATE_INFORMATION_POLLING_DURATION);
     }
 
     @AfterEach
@@ -94,13 +96,13 @@ class SerialTaskManagerWorkerTest {
         TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new MemoryReferenceWithCounterTask((counter) ->
             Mono.fromCallable(counter::incrementAndGet)
                 .delayElement(Duration.ofSeconds(1))
-                .repeat(2)
+                .repeat(3)
                 .then(Mono.just(Task.Result.COMPLETED))
                 .block()));
 
         worker.executeTask(taskWithId).block();
 
-        verify(listener, atMost(3)).updated(eq(taskWithId.getId()), notNull());
+        verify(listener, atMost(4)).updated(eq(taskWithId.getId()), notNull());
     }
 
     @Test
diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
index d3d9653..05ef373 100644
--- a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
+++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
@@ -58,7 +58,7 @@ class EventSourcingTaskManagerTest implements TaskManagerContract {
         TaskExecutionDetailsProjection executionDetailsProjection = new MemoryTaskExecutionDetailsProjection();
         WorkQueueSupplier workQueueSupplier = eventSourcingSystem -> {
             WorkerStatusListener listener = new WorkerStatusListener(eventSourcingSystem);
-            TaskManagerWorker worker = new SerialTaskManagerWorker(listener);
+            TaskManagerWorker worker = new SerialTaskManagerWorker(listener, UPDATE_INFORMATION_POLLING_INTERVAL);
             return new MemoryWorkQueue(worker);
         };
         taskManager = new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection, HOSTNAME, new MemoryTerminationSubscriber());


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org