You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/10/18 06:40:46 UTC
[james-project] 13/17: JAMES-2813 change polling intervall and make
it a parameter of the worker
This is an automated email from the ASF dual-hosted git repository.
rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e1ff8c54a13a4609c1d42cae9c001da2e1ef36c0
Author: RĂ©mi KOWALSKI <rk...@linagora.com>
AuthorDate: Thu Oct 17 10:55:50 2019 +0200
JAMES-2813 change polling intervall and make it a parameter of the worker
---
.../java/org/apache/james/task/TaskManagerContract.java | 2 +-
.../distributed/RabbitMQWorkQueueSupplier.scala | 13 +++++++++++--
.../distributed/DistributedTaskManagerTest.java | 3 +--
.../main/java/org/apache/james/task/MemoryTaskManager.java | 3 ++-
.../java/org/apache/james/task/SerialTaskManagerWorker.java | 6 ++++--
.../org/apache/james/task/SerialTaskManagerWorkerTest.java | 8 +++++---
.../task/eventsourcing/EventSourcingTaskManagerTest.java | 2 +-
7 files changed, 25 insertions(+), 12 deletions(-)
diff --git a/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java b/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
index d482eed..406671f 100644
--- a/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
+++ b/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
@@ -38,7 +38,7 @@ import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
public interface TaskManagerContract {
-
+ java.time.Duration UPDATE_INFORMATION_POLLING_INTERVAL = java.time.Duration.ofSeconds(1);
Duration slowPacedPollInterval = ONE_HUNDRED_MILLISECONDS;
ConditionFactory calmlyAwait = Awaitility.with()
.pollInterval(slowPacedPollInterval)
diff --git a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
index a6c468c..f86be44 100644
--- a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
+++ b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
@@ -18,8 +18,10 @@
* ***************************************************************/
package org.apache.james.task.eventsourcing.distributed
-import javax.inject.Inject
+import java.time.Duration
+import com.google.common.annotations.VisibleForTesting
+import javax.inject.Inject
import org.apache.james.backends.rabbitmq.SimpleConnectionPool
import org.apache.james.eventsourcing.EventSourcingSystem
import org.apache.james.server.task.json.JsonTaskSerializer
@@ -28,9 +30,16 @@ import org.apache.james.task.eventsourcing.{WorkQueueSupplier, WorkerStatusListe
class RabbitMQWorkQueueSupplier @Inject()(private val rabbitMQConnectionPool: SimpleConnectionPool,
private val jsonTaskSerializer: JsonTaskSerializer) extends WorkQueueSupplier {
+
+ val DEFAULT_ADDITIONAL_INFORMATION_POLLING_INTERVAL = Duration.ofSeconds(30)
override def apply(eventSourcingSystem: EventSourcingSystem): RabbitMQWorkQueue = {
+ apply(eventSourcingSystem, DEFAULT_ADDITIONAL_INFORMATION_POLLING_INTERVAL)
+ }
+
+ @VisibleForTesting
+ def apply(eventSourcingSystem: EventSourcingSystem, additionalInformationPollingInterval: Duration): RabbitMQWorkQueue = {
val listener = WorkerStatusListener(eventSourcingSystem)
- val worker = new SerialTaskManagerWorker(listener)
+ val worker = new SerialTaskManagerWorker(listener, additionalInformationPollingInterval)
val rabbitMQWorkQueue = new RabbitMQWorkQueue(worker, rabbitMQConnectionPool, jsonTaskSerializer)
rabbitMQWorkQueue.start()
rabbitMQWorkQueue
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index 6774b67..3f4be63 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -68,7 +68,6 @@ import org.awaitility.Duration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import com.github.steveash.guavate.Guavate;
@@ -86,7 +85,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
@Override
public WorkQueue apply(EventSourcingSystem eventSourcingSystem) {
- RabbitMQWorkQueue workQueue = supplier.apply(eventSourcingSystem);
+ RabbitMQWorkQueue workQueue = supplier.apply(eventSourcingSystem, UPDATE_INFORMATION_POLLING_INTERVAL);
workQueues.add(workQueue);
return workQueue;
}
diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java
index 56ffd2d..64f60e9 100644
--- a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -93,6 +93,7 @@ public class MemoryTaskManager implements TaskManager {
}
}
+ private static final Duration UPDATE_INFORMATION_POLLING_DURATION = Duration.ofSeconds(5);
private static final Duration AWAIT_POLLING_DURATION = Duration.ofMillis(500);
public static final Duration NOW = Duration.ZERO;
@@ -105,7 +106,7 @@ public class MemoryTaskManager implements TaskManager {
public MemoryTaskManager(Hostname hostname) {
this.hostname = hostname;
this.idToExecutionDetails = new ConcurrentHashMap<>();
- this.worker = new SerialTaskManagerWorker(updater());
+ this.worker = new SerialTaskManagerWorker(updater(), UPDATE_INFORMATION_POLLING_DURATION);
workQueue = new MemoryWorkQueue(worker);
}
diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index c296bf1..34f4548 100644
--- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -52,8 +52,10 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
private final AtomicReference<Tuple2<TaskId, Future<?>>> runningTask;
private final Semaphore semaphore;
private final Set<TaskId> cancelledTasks;
+ private final Duration pollingInterval;
- public SerialTaskManagerWorker(Listener listener) {
+ public SerialTaskManagerWorker(Listener listener, Duration pollingInterval) {
+ this.pollingInterval = pollingInterval;
this.taskExecutor = Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor"));
this.listener = listener;
this.cancelledTasks = Sets.newConcurrentHashSet();
@@ -112,7 +114,7 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
private Flux<TaskExecutionDetails.AdditionalInformation> pollAdditionalInformation(TaskWithId taskWithId) {
return Mono.fromCallable(() -> taskWithId.getTask().details())
- .delayElement(Duration.ofSeconds(1), Schedulers.boundedElastic())
+ .delayElement(pollingInterval, Schedulers.boundedElastic())
.repeat()
.flatMap(Mono::justOrEmpty)
.doOnNext(information -> listener.updated(taskWithId.getId(), information));
diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
index b693347..a83392f 100644
--- a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
+++ b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
@@ -44,6 +44,8 @@ import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
class SerialTaskManagerWorkerTest {
+ private static final Duration UPDATE_INFORMATION_POLLING_DURATION = Duration.ofSeconds(1);
+
private TaskManagerWorker.Listener listener;
private SerialTaskManagerWorker worker;
@@ -54,7 +56,7 @@ class SerialTaskManagerWorkerTest {
@BeforeEach
void beforeEach() {
listener = mock(TaskManagerWorker.Listener.class);
- worker = new SerialTaskManagerWorker(listener);
+ worker = new SerialTaskManagerWorker(listener, UPDATE_INFORMATION_POLLING_DURATION);
}
@AfterEach
@@ -94,13 +96,13 @@ class SerialTaskManagerWorkerTest {
TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new MemoryReferenceWithCounterTask((counter) ->
Mono.fromCallable(counter::incrementAndGet)
.delayElement(Duration.ofSeconds(1))
- .repeat(2)
+ .repeat(3)
.then(Mono.just(Task.Result.COMPLETED))
.block()));
worker.executeTask(taskWithId).block();
- verify(listener, atMost(3)).updated(eq(taskWithId.getId()), notNull());
+ verify(listener, atMost(4)).updated(eq(taskWithId.getId()), notNull());
}
@Test
diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
index d3d9653..05ef373 100644
--- a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
+++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
@@ -58,7 +58,7 @@ class EventSourcingTaskManagerTest implements TaskManagerContract {
TaskExecutionDetailsProjection executionDetailsProjection = new MemoryTaskExecutionDetailsProjection();
WorkQueueSupplier workQueueSupplier = eventSourcingSystem -> {
WorkerStatusListener listener = new WorkerStatusListener(eventSourcingSystem);
- TaskManagerWorker worker = new SerialTaskManagerWorker(listener);
+ TaskManagerWorker worker = new SerialTaskManagerWorker(listener, UPDATE_INFORMATION_POLLING_INTERVAL);
return new MemoryWorkQueue(worker);
};
taskManager = new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection, HOSTNAME, new MemoryTerminationSubscriber());
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org