You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/02/25 23:59:30 UTC

[james-project] branch master updated: JAMES-3891 Graceful shutdown for queue consumers (#1466)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f869e3af1 JAMES-3891 Graceful shutdown for queue consumers (#1466)
3f869e3af1 is described below

commit 3f869e3af1bd34c7e9bc06921e91cfb1d73787e5
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Sun Feb 26 06:59:24 2023 +0700

    JAMES-3891 Graceful shutdown for queue consumers (#1466)
    
     -> Trash the mail queue last
     -> Add a grace period for finishing pending processings upon shutdown.
---
 .../java/org/apache/james/GuiceLifecycleHeathCheckTest.java    |  4 ++--
 .../apache/james/mailetcontainer/impl/JamesMailSpooler.java    | 10 ++++++++--
 .../org/apache/james/transport/mailets/RemoteDelivery.java     |  2 +-
 .../transport/mailets/remote/delivery/DeliveryRunnable.java    |  5 ++++-
 4 files changed, 15 insertions(+), 6 deletions(-)

diff --git a/server/apps/memory-app/src/test/java/org/apache/james/GuiceLifecycleHeathCheckTest.java b/server/apps/memory-app/src/test/java/org/apache/james/GuiceLifecycleHeathCheckTest.java
index 5a257d0161..089856e1de 100644
--- a/server/apps/memory-app/src/test/java/org/apache/james/GuiceLifecycleHeathCheckTest.java
+++ b/server/apps/memory-app/src/test/java/org/apache/james/GuiceLifecycleHeathCheckTest.java
@@ -117,13 +117,13 @@ class GuiceLifecycleHeathCheckTest {
             .build();
 
         @Test
-        void stoppingJamesServerShouldBeUnhealthy(GuiceJamesServer server) {
+        void stoppingJamesServerShouldBeUnhealthy(GuiceJamesServer server) throws Exception {
             Sinks.Empty<Object> sink = Sinks.empty();
             try {
                 configureRequestSpecification(server);
 
                 Mono.fromRunnable(server::stop)
-                    .publishOn(Schedulers.newSingle("test"))
+                    .publishOn(Schedulers.boundedElastic())
                     .subscribe(r -> {}, e -> {}, () -> sink.emitEmpty(FAIL_FAST));
 
                 when()
diff --git a/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
index 87133c3dc7..bdb2f7c71a 100644
--- a/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
+++ b/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
@@ -22,6 +22,7 @@ package org.apache.james.mailetcontainer.impl;
 import static reactor.core.scheduler.Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -189,14 +190,19 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
 
         public void dispose() {
             LOGGER.info("start dispose() ...");
+            LOGGER.info("Cancel queue consumption...");
             disposable.dispose();
+            LOGGER.info("Queue consumption canceled, shutting down processor threads...");
+            scheduler.disposeGracefully()
+                .timeout(Duration.ofSeconds(5))
+                .onErrorResume(e -> Mono.empty())
+                .block();
+            LOGGER.info("Thread shutdown completed. Turning off mail queue.");
             try {
                 queue.close();
             } catch (IOException e) {
                 LOGGER.debug("error closing queue", e);
             }
-            LOGGER.info("thread shutdown completed.");
-            scheduler.dispose();
         }
 
         public int getCurrentSpoolCount() {
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
index 93f140dcf3..3ec5436c4f 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
@@ -256,7 +256,7 @@ public class RemoteDelivery extends GenericMailet {
      * service.
      */
     @Override
-    public synchronized void destroy() {
+    public void destroy() {
         if (startThreads == ThreadState.START_THREADS) {
             deliveryRunnable.dispose();
         }
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
index 7ed0525bf4..7bedc9e551 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
@@ -204,7 +204,10 @@ public class DeliveryRunnable implements Disposable {
     @Override
     public void dispose() {
         disposable.dispose();
-        remoteDeliveryProcessScheduler.dispose();
         remoteDeliveryDequeueScheduler.dispose();
+        remoteDeliveryProcessScheduler.disposeGracefully()
+            .timeout(Duration.ofSeconds(2))
+            .onErrorResume(e -> Mono.empty())
+            .block();
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org