You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/02/25 23:59:30 UTC
[james-project] branch master updated: JAMES-3891 Graceful shutdown for queue consumers (#1466)
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 3f869e3af1 JAMES-3891 Graceful shutdown for queue consumers (#1466)
3f869e3af1 is described below
commit 3f869e3af1bd34c7e9bc06921e91cfb1d73787e5
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Sun Feb 26 06:59:24 2023 +0700
JAMES-3891 Graceful shutdown for queue consumers (#1466)
-> Trash the mail queue last
-> Add a grace period for finishing pending processings upon shutdown.
---
.../java/org/apache/james/GuiceLifecycleHeathCheckTest.java | 4 ++--
.../apache/james/mailetcontainer/impl/JamesMailSpooler.java | 10 ++++++++--
.../org/apache/james/transport/mailets/RemoteDelivery.java | 2 +-
.../transport/mailets/remote/delivery/DeliveryRunnable.java | 5 ++++-
4 files changed, 15 insertions(+), 6 deletions(-)
diff --git a/server/apps/memory-app/src/test/java/org/apache/james/GuiceLifecycleHeathCheckTest.java b/server/apps/memory-app/src/test/java/org/apache/james/GuiceLifecycleHeathCheckTest.java
index 5a257d0161..089856e1de 100644
--- a/server/apps/memory-app/src/test/java/org/apache/james/GuiceLifecycleHeathCheckTest.java
+++ b/server/apps/memory-app/src/test/java/org/apache/james/GuiceLifecycleHeathCheckTest.java
@@ -117,13 +117,13 @@ class GuiceLifecycleHeathCheckTest {
.build();
@Test
- void stoppingJamesServerShouldBeUnhealthy(GuiceJamesServer server) {
+ void stoppingJamesServerShouldBeUnhealthy(GuiceJamesServer server) throws Exception {
Sinks.Empty<Object> sink = Sinks.empty();
try {
configureRequestSpecification(server);
Mono.fromRunnable(server::stop)
- .publishOn(Schedulers.newSingle("test"))
+ .publishOn(Schedulers.boundedElastic())
.subscribe(r -> {}, e -> {}, () -> sink.emitEmpty(FAIL_FAST));
when()
diff --git a/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
index 87133c3dc7..bdb2f7c71a 100644
--- a/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
+++ b/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
@@ -22,6 +22,7 @@ package org.apache.james.mailetcontainer.impl;
import static reactor.core.scheduler.Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE;
import java.io.IOException;
+import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
@@ -189,14 +190,19 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
public void dispose() {
LOGGER.info("start dispose() ...");
+ LOGGER.info("Cancel queue consumption...");
disposable.dispose();
+ LOGGER.info("Queue consumption canceled, shutting down processor threads...");
+ scheduler.disposeGracefully()
+ .timeout(Duration.ofSeconds(5))
+ .onErrorResume(e -> Mono.empty())
+ .block();
+ LOGGER.info("Thread shutdown completed. Turning off mail queue.");
try {
queue.close();
} catch (IOException e) {
LOGGER.debug("error closing queue", e);
}
- LOGGER.info("thread shutdown completed.");
- scheduler.dispose();
}
public int getCurrentSpoolCount() {
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
index 93f140dcf3..3ec5436c4f 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
@@ -256,7 +256,7 @@ public class RemoteDelivery extends GenericMailet {
* service.
*/
@Override
- public synchronized void destroy() {
+ public void destroy() {
if (startThreads == ThreadState.START_THREADS) {
deliveryRunnable.dispose();
}
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
index 7ed0525bf4..7bedc9e551 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
@@ -204,7 +204,10 @@ public class DeliveryRunnable implements Disposable {
@Override
public void dispose() {
disposable.dispose();
- remoteDeliveryProcessScheduler.dispose();
remoteDeliveryDequeueScheduler.dispose();
+ remoteDeliveryProcessScheduler.disposeGracefully()
+ .timeout(Duration.ofSeconds(2))
+ .onErrorResume(e -> Mono.empty())
+ .block();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org