You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/03/16 09:17:18 UTC

[james-project] 02/02: JAMES-3891 Integration test for graceful shutdown

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4e92cdf6095ff42011abc48c4b41d0eba15c86e8
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Mar 13 15:14:55 2023 +0700

    JAMES-3891 Integration test for graceful shutdown
    
    Fix:
     - Dedicated scheduler for queue consumption. This allows to stop consumption
     while still processing items.
---
 .../flow/PauseThenCountingExecutionMailet.java     |  47 ++++++++++
 .../mailets/flow/ShutDownIntegrationTest.java      | 104 +++++++++++++++++++++
 .../mailetcontainer/impl/JamesMailSpooler.java     |   9 +-
 3 files changed, 158 insertions(+), 2 deletions(-)

diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/PauseThenCountingExecutionMailet.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/PauseThenCountingExecutionMailet.java
new file mode 100644
index 0000000000..50aceceb94
--- /dev/null
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/PauseThenCountingExecutionMailet.java
@@ -0,0 +1,47 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailets.flow;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.mailet.Mail;
+import org.apache.mailet.base.GenericMailet;
+
+public class PauseThenCountingExecutionMailet extends GenericMailet {
+    private static final AtomicLong executionCount = new AtomicLong();
+
+    public static void reset() {
+        executionCount.set(0L);
+    }
+
+    public static long executionCount() {
+        return executionCount.get();
+    }
+
+    @Override
+    public void service(Mail mail) {
+        try {
+            Thread.sleep(500);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+        executionCount.incrementAndGet();
+    }
+}
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ShutDownIntegrationTest.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ShutDownIntegrationTest.java
new file mode 100644
index 0000000000..15615ce46e
--- /dev/null
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ShutDownIntegrationTest.java
@@ -0,0 +1,104 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailets.flow;
+
+import static org.apache.james.mailets.configuration.Constants.DEFAULT_DOMAIN;
+import static org.apache.james.mailets.configuration.Constants.LOCALHOST_IP;
+import static org.apache.james.mailets.configuration.Constants.PASSWORD;
+import static org.apache.james.mailets.configuration.Constants.RECIPIENT;
+import static org.apache.james.mailets.configuration.Constants.awaitAtMostOneMinute;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.james.MemoryJamesServerMain;
+import org.apache.james.mailets.TemporaryJamesServer;
+import org.apache.james.mailets.configuration.CommonProcessors;
+import org.apache.james.mailets.configuration.MailetConfiguration;
+import org.apache.james.mailets.configuration.ProcessorConfiguration;
+import org.apache.james.modules.protocols.SmtpGuiceProbe;
+import org.apache.james.probe.DataProbe;
+import org.apache.james.transport.mailets.Null;
+import org.apache.james.transport.matchers.All;
+import org.apache.james.utils.DataProbeImpl;
+import org.apache.james.utils.SMTPMessageSender;
+import org.apache.james.utils.TestIMAPClient;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+class ShutDownIntegrationTest {
+    private static final String POSTMASTER = "postmaster@" + DEFAULT_DOMAIN;
+
+    @RegisterExtension
+    public TestIMAPClient testIMAPClient = new TestIMAPClient();
+    @RegisterExtension
+    public SMTPMessageSender messageSender = new SMTPMessageSender(DEFAULT_DOMAIN);
+
+    private TemporaryJamesServer jamesServer;
+
+    @BeforeEach
+    void setUp(@TempDir File temporaryFolder) throws Exception {
+        jamesServer = TemporaryJamesServer.builder()
+            .withBase(MemoryJamesServerMain.IN_MEMORY_SERVER_AGGREGATE_MODULE)
+            .withMailetContainer(TemporaryJamesServer.defaultMailetContainerConfiguration()
+                .postmaster(POSTMASTER)
+                .putProcessor(transport()))
+            .build(temporaryFolder);
+        jamesServer.start();
+
+        DataProbe dataProbe = jamesServer.getProbe(DataProbeImpl.class);
+        dataProbe.addDomain(DEFAULT_DOMAIN);
+        dataProbe.addUser(RECIPIENT, PASSWORD);
+    }
+
+    @Test
+    void shouldAllowADelayToProcessPendingMails() throws Exception {
+        messageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort())
+            .authenticate(RECIPIENT, PASSWORD)
+            .sendMessage(RECIPIENT, RECIPIENT);
+
+        AtomicBoolean isOff = new AtomicBoolean(false);
+        Mono.fromRunnable(() -> {
+            jamesServer.shutdown();
+            isOff.set(true);
+        })
+            .subscribeOn(Schedulers.boundedElastic())
+            .subscribe();
+
+        awaitAtMostOneMinute.until(() -> PauseThenCountingExecutionMailet.executionCount() == 1);
+        awaitAtMostOneMinute.until(isOff::get);
+    }
+
+    private ProcessorConfiguration.Builder transport() {
+        return ProcessorConfiguration.transport()
+            .addMailet(MailetConfiguration.builder()
+                .matcher(All.class)
+                .mailet(PauseThenCountingExecutionMailet.class))
+            .addMailet(MailetConfiguration.builder()
+                .matcher(All.class)
+                .mailet(Null.class))
+            .addMailetsFrom(CommonProcessors.transport());
+    }
+}
diff --git a/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
index bdb2f7c71a..d940188ecf 100644
--- a/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
+++ b/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
@@ -82,6 +82,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
         private final MailQueue queue;
         private final Configuration configuration;
         private final Scheduler scheduler;
+        private final Scheduler queueScheduler;
 
         private Runner(MetricFactory metricFactory, GaugeRegistry gaugeRegistry, MailProcessor mailProcessor,
                        MailRepository errorRepository, MailQueue queue, Configuration configuration) {
@@ -94,6 +95,9 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
             scheduler = Schedulers.newBoundedElastic(configuration.getConcurrencyLevel() + 1, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
                 "spooler");
 
+            queueScheduler = Schedulers.newBoundedElastic(1, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
+                "queueScheduler");
+
             this.disposable = run(queue);
 
             gaugeRegistry.register(SPOOL_PROCESSING + ".inFlight",
@@ -104,7 +108,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
             return Flux.from(queue.deQueue())
                 .flatMap(item -> handleOnQueueItem(item).subscribeOn(scheduler), configuration.getConcurrencyLevel())
                 .onErrorContinue((throwable, item) -> LOGGER.error("Exception processing mail while spooling {}", item, throwable))
-                .subscribeOn(scheduler)
+                .subscribeOn(queueScheduler)
                 .subscribe();
         }
 
@@ -191,12 +195,13 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
         public void dispose() {
             LOGGER.info("start dispose() ...");
             LOGGER.info("Cancel queue consumption...");
-            disposable.dispose();
+            queueScheduler.dispose();
             LOGGER.info("Queue consumption canceled, shutting down processor threads...");
             scheduler.disposeGracefully()
                 .timeout(Duration.ofSeconds(5))
                 .onErrorResume(e -> Mono.empty())
                 .block();
+            disposable.dispose();
             LOGGER.info("Thread shutdown completed. Turning off mail queue.");
             try {
                 queue.close();


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org