You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/03/19 14:35:37 UTC

[james-project] branch 3.7.x updated: JAMES-3891 Graceful shutdown for queue consumers [3.7.x] (#1479)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch 3.7.x
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/3.7.x by this push:
     new f643ab3657 JAMES-3891 Graceful shutdown for queue consumers [3.7.x] (#1479)
f643ab3657 is described below

commit f643ab36575995e43329068241213fe103d48018
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Sun Mar 19 15:35:29 2023 +0100

    JAMES-3891 Graceful shutdown for queue consumers [3.7.x] (#1479)
    
    Graceful shutdown for scheduler is a late addition to the reactor framework,
    hence I propose an alternative to it.
---
 .../flow/PauseThenCountingExecutionMailet.java     |  47 ++++++++++
 .../mailets/flow/ShutDownIntegrationTest.java      | 104 +++++++++++++++++++++
 server/mailet/mailetcontainer-impl/pom.xml         |   1 -
 .../mailetcontainer/impl/JamesMailSpooler.java     |  36 ++++++-
 server/mailet/mailets/pom.xml                      |   4 +
 .../james/transport/mailets/RemoteDelivery.java    |   2 +-
 .../mailets/remote/delivery/DeliveryRunnable.java  |  16 ++++
 7 files changed, 204 insertions(+), 6 deletions(-)

diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/PauseThenCountingExecutionMailet.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/PauseThenCountingExecutionMailet.java
new file mode 100644
index 0000000000..50aceceb94
--- /dev/null
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/PauseThenCountingExecutionMailet.java
@@ -0,0 +1,47 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailets.flow;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.mailet.Mail;
+import org.apache.mailet.base.GenericMailet;
+
+public class PauseThenCountingExecutionMailet extends GenericMailet {
+    private static final AtomicLong executionCount = new AtomicLong();
+
+    public static void reset() {
+        executionCount.set(0L);
+    }
+
+    public static long executionCount() {
+        return executionCount.get();
+    }
+
+    @Override
+    public void service(Mail mail) {
+        try {
+            Thread.sleep(500);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+        executionCount.incrementAndGet();
+    }
+}
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ShutDownIntegrationTest.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ShutDownIntegrationTest.java
new file mode 100644
index 0000000000..15615ce46e
--- /dev/null
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ShutDownIntegrationTest.java
@@ -0,0 +1,104 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailets.flow;
+
+import static org.apache.james.mailets.configuration.Constants.DEFAULT_DOMAIN;
+import static org.apache.james.mailets.configuration.Constants.LOCALHOST_IP;
+import static org.apache.james.mailets.configuration.Constants.PASSWORD;
+import static org.apache.james.mailets.configuration.Constants.RECIPIENT;
+import static org.apache.james.mailets.configuration.Constants.awaitAtMostOneMinute;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.james.MemoryJamesServerMain;
+import org.apache.james.mailets.TemporaryJamesServer;
+import org.apache.james.mailets.configuration.CommonProcessors;
+import org.apache.james.mailets.configuration.MailetConfiguration;
+import org.apache.james.mailets.configuration.ProcessorConfiguration;
+import org.apache.james.modules.protocols.SmtpGuiceProbe;
+import org.apache.james.probe.DataProbe;
+import org.apache.james.transport.mailets.Null;
+import org.apache.james.transport.matchers.All;
+import org.apache.james.utils.DataProbeImpl;
+import org.apache.james.utils.SMTPMessageSender;
+import org.apache.james.utils.TestIMAPClient;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+class ShutDownIntegrationTest {
+    private static final String POSTMASTER = "postmaster@" + DEFAULT_DOMAIN;
+
+    @RegisterExtension
+    public TestIMAPClient testIMAPClient = new TestIMAPClient();
+    @RegisterExtension
+    public SMTPMessageSender messageSender = new SMTPMessageSender(DEFAULT_DOMAIN);
+
+    private TemporaryJamesServer jamesServer;
+
+    @BeforeEach
+    void setUp(@TempDir File temporaryFolder) throws Exception {
+        jamesServer = TemporaryJamesServer.builder()
+            .withBase(MemoryJamesServerMain.IN_MEMORY_SERVER_AGGREGATE_MODULE)
+            .withMailetContainer(TemporaryJamesServer.defaultMailetContainerConfiguration()
+                .postmaster(POSTMASTER)
+                .putProcessor(transport()))
+            .build(temporaryFolder);
+        jamesServer.start();
+
+        DataProbe dataProbe = jamesServer.getProbe(DataProbeImpl.class);
+        dataProbe.addDomain(DEFAULT_DOMAIN);
+        dataProbe.addUser(RECIPIENT, PASSWORD);
+    }
+
+    @Test
+    void shouldAllowADelayToProcessPendingMails() throws Exception {
+        messageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort())
+            .authenticate(RECIPIENT, PASSWORD)
+            .sendMessage(RECIPIENT, RECIPIENT);
+
+        AtomicBoolean isOff = new AtomicBoolean(false);
+        Mono.fromRunnable(() -> {
+            jamesServer.shutdown();
+            isOff.set(true);
+        })
+            .subscribeOn(Schedulers.boundedElastic())
+            .subscribe();
+
+        awaitAtMostOneMinute.until(() -> PauseThenCountingExecutionMailet.executionCount() == 1);
+        awaitAtMostOneMinute.until(isOff::get);
+    }
+
+    private ProcessorConfiguration.Builder transport() {
+        return ProcessorConfiguration.transport()
+            .addMailet(MailetConfiguration.builder()
+                .matcher(All.class)
+                .mailet(PauseThenCountingExecutionMailet.class))
+            .addMailet(MailetConfiguration.builder()
+                .matcher(All.class)
+                .mailet(Null.class))
+            .addMailetsFrom(CommonProcessors.transport());
+    }
+}
diff --git a/server/mailet/mailetcontainer-impl/pom.xml b/server/mailet/mailetcontainer-impl/pom.xml
index 62b336035a..c51a575742 100644
--- a/server/mailet/mailetcontainer-impl/pom.xml
+++ b/server/mailet/mailetcontainer-impl/pom.xml
@@ -134,7 +134,6 @@
         <dependency>
             <groupId>org.awaitility</groupId>
             <artifactId>awaitility</artifactId>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.mockito</groupId>
diff --git a/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
index 47f88fa9ea..e388af4879 100644
--- a/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
+++ b/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
@@ -19,7 +19,10 @@
 
 package org.apache.james.mailetcontainer.impl;
 
+import static reactor.core.scheduler.Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE;
+
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -50,6 +53,7 @@ import org.apache.mailet.Attribute;
 import org.apache.mailet.AttributeName;
 import org.apache.mailet.AttributeValue;
 import org.apache.mailet.Mail;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,6 +63,7 @@ import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
 
 /**
@@ -75,6 +80,8 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
         private final reactor.core.Disposable disposable;
         private final MailQueue queue;
         private final Configuration configuration;
+        private final Scheduler scheduler;
+        private final Scheduler queueScheduler;
 
         private Runner(MetricFactory metricFactory, GaugeRegistry gaugeRegistry, MailProcessor mailProcessor,
                        MailRepository errorRepository, MailQueue queue, Configuration configuration) {
@@ -84,7 +91,13 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
             this.queue = queue;
             this.configuration = configuration;
 
-            this.disposable = run(queue);
+            scheduler = Schedulers.newBoundedElastic(configuration.getConcurrencyLevel() + 1, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
+                "spooler");
+
+            queueScheduler = Schedulers.newBoundedElastic(1, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
+                "queueScheduler");
+
+            disposable = run(queue);
 
             gaugeRegistry.register(SPOOL_PROCESSING + ".inFlight",
                 processingActive::get);
@@ -92,9 +105,9 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
 
         private reactor.core.Disposable run(MailQueue queue) {
             return Flux.from(queue.deQueue())
-                .flatMap(item -> handleOnQueueItem(item).subscribeOn(Schedulers.elastic()), configuration.getConcurrencyLevel())
+                .flatMap(item -> handleOnQueueItem(item).subscribeOn(scheduler), configuration.getConcurrencyLevel())
                 .onErrorContinue((throwable, item) -> LOGGER.error("Exception processing mail while spooling {}", item, throwable))
-                .subscribeOn(Schedulers.elastic())
+                .subscribeOn(queueScheduler)
                 .subscribe();
         }
 
@@ -180,13 +193,28 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
 
         public void dispose() {
             LOGGER.info("start dispose() ...");
+            LOGGER.info("Cancel queue consumption...");
+            queueScheduler.dispose();
+            LOGGER.info("Queue consumption canceled, awaiting task completion...");
+
+            try {
+                Awaitility.await()
+                    .atMost(Duration.ofSeconds(2))
+                    .pollDelay(Duration.ofMillis(100))
+                    .until(() -> processingActive.get() == 0);
+            } catch (Exception e) {
+                LOGGER.warn("Failure disposing gracefully JamesMailSpooler", e);
+            }
+
+            LOGGER.info("Shutting down processor threads...");
+            scheduler.dispose();
             disposable.dispose();
+            LOGGER.info("Thread shutdown completed. Turning off mail queue.");
             try {
                 queue.close();
             } catch (IOException e) {
                 LOGGER.debug("error closing queue", e);
             }
-            LOGGER.info("thread shutdown completed.");
         }
 
         public int getCurrentSpoolCount() {
diff --git a/server/mailet/mailets/pom.xml b/server/mailet/mailets/pom.xml
index 718631f3f6..147ef846bf 100644
--- a/server/mailet/mailets/pom.xml
+++ b/server/mailet/mailets/pom.xml
@@ -231,6 +231,10 @@
             <groupId>org.apache.james.jspf</groupId>
             <artifactId>apache-jspf-resolver</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>java-hamcrest</artifactId>
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
index 633cf0775d..a78c724884 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
@@ -253,7 +253,7 @@ public class RemoteDelivery extends GenericMailet {
      * service.
      */
     @Override
-    public synchronized void destroy() {
+    public void destroy() {
         if (startThreads == ThreadState.START_THREADS) {
             deliveryRunnable.dispose();
         }
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
index 25d3a2d549..a24b4f9b33 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
@@ -24,6 +24,7 @@ import static org.apache.james.transport.mailets.remote.delivery.Bouncer.IS_DELI
 import java.io.Closeable;
 import java.time.Duration;
 import java.util.Date;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 
 import org.apache.james.dnsservice.api.DNSService;
@@ -38,6 +39,7 @@ import org.apache.mailet.Attribute;
 import org.apache.mailet.AttributeValue;
 import org.apache.mailet.Mail;
 import org.apache.mailet.MailetContext;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,6 +69,7 @@ public class DeliveryRunnable implements Disposable {
     private final MailDelivrer mailDelivrer;
     private final Supplier<Date> dateSupplier;
     private final MailetContext mailetContext;
+    private final AtomicInteger pendingDeliveries;
     private Disposable disposable;
     private Scheduler remoteDeliveryProcessScheduler;
     private Scheduler remoteDeliveryDequeueScheduler;
@@ -89,6 +92,7 @@ public class DeliveryRunnable implements Disposable {
         this.dateSupplier = dateSupplier;
         this.metricFactory = metricFactory;
         this.mailetContext = mailetContext;
+        this.pendingDeliveries = new AtomicInteger(0);
     }
 
     public void start() {
@@ -111,6 +115,7 @@ public class DeliveryRunnable implements Disposable {
     private Mono<Void> processMail(MailQueue.MailQueueItem queueItem) {
         return Mono.create(sink -> {
             Mail mail = queueItem.getMail();
+            pendingDeliveries.incrementAndGet();
 
             try (Closeable closeable =
                      MDCBuilder.create()
@@ -136,6 +141,7 @@ public class DeliveryRunnable implements Disposable {
                 sink.error(e);
             } finally {
                 LifecycleUtil.dispose(mail);
+                pendingDeliveries.decrementAndGet();
             }
         });
     }
@@ -204,6 +210,16 @@ public class DeliveryRunnable implements Disposable {
     @Override
     public void dispose() {
         disposable.dispose();
+
+        try {
+            Awaitility.await()
+                .atMost(Duration.ofSeconds(2))
+                .pollDelay(Duration.ofMillis(100))
+                .until(() -> pendingDeliveries.get() == 0);
+        } catch (Exception e) {
+            LOGGER.warn("Failure disposing gracefully RemoteDelivery", e);
+        }
+
         remoteDeliveryProcessScheduler.dispose();
         remoteDeliveryDequeueScheduler.dispose();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org