You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/03/16 09:17:18 UTC
[james-project] 02/02: JAMES-3891 Integration test for graceful shutdown
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 4e92cdf6095ff42011abc48c4b41d0eba15c86e8
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Mar 13 15:14:55 2023 +0700
JAMES-3891 Integration test for graceful shutdown
Fix:
- Dedicated scheduler for queue consumption. This allows to stop consumption
while still processing items.
---
.../flow/PauseThenCountingExecutionMailet.java | 47 ++++++++++
.../mailets/flow/ShutDownIntegrationTest.java | 104 +++++++++++++++++++++
.../mailetcontainer/impl/JamesMailSpooler.java | 9 +-
3 files changed, 158 insertions(+), 2 deletions(-)
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/PauseThenCountingExecutionMailet.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/PauseThenCountingExecutionMailet.java
new file mode 100644
index 0000000000..50aceceb94
--- /dev/null
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/PauseThenCountingExecutionMailet.java
@@ -0,0 +1,47 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailets.flow;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.mailet.Mail;
+import org.apache.mailet.base.GenericMailet;
+
+public class PauseThenCountingExecutionMailet extends GenericMailet {
+ private static final AtomicLong executionCount = new AtomicLong();
+
+ public static void reset() {
+ executionCount.set(0L);
+ }
+
+ public static long executionCount() {
+ return executionCount.get();
+ }
+
+ @Override
+ public void service(Mail mail) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ executionCount.incrementAndGet();
+ }
+}
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ShutDownIntegrationTest.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ShutDownIntegrationTest.java
new file mode 100644
index 0000000000..15615ce46e
--- /dev/null
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ShutDownIntegrationTest.java
@@ -0,0 +1,104 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailets.flow;
+
+import static org.apache.james.mailets.configuration.Constants.DEFAULT_DOMAIN;
+import static org.apache.james.mailets.configuration.Constants.LOCALHOST_IP;
+import static org.apache.james.mailets.configuration.Constants.PASSWORD;
+import static org.apache.james.mailets.configuration.Constants.RECIPIENT;
+import static org.apache.james.mailets.configuration.Constants.awaitAtMostOneMinute;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.james.MemoryJamesServerMain;
+import org.apache.james.mailets.TemporaryJamesServer;
+import org.apache.james.mailets.configuration.CommonProcessors;
+import org.apache.james.mailets.configuration.MailetConfiguration;
+import org.apache.james.mailets.configuration.ProcessorConfiguration;
+import org.apache.james.modules.protocols.SmtpGuiceProbe;
+import org.apache.james.probe.DataProbe;
+import org.apache.james.transport.mailets.Null;
+import org.apache.james.transport.matchers.All;
+import org.apache.james.utils.DataProbeImpl;
+import org.apache.james.utils.SMTPMessageSender;
+import org.apache.james.utils.TestIMAPClient;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+class ShutDownIntegrationTest {
+ private static final String POSTMASTER = "postmaster@" + DEFAULT_DOMAIN;
+
+ @RegisterExtension
+ public TestIMAPClient testIMAPClient = new TestIMAPClient();
+ @RegisterExtension
+ public SMTPMessageSender messageSender = new SMTPMessageSender(DEFAULT_DOMAIN);
+
+ private TemporaryJamesServer jamesServer;
+
+ @BeforeEach
+ void setUp(@TempDir File temporaryFolder) throws Exception {
+ jamesServer = TemporaryJamesServer.builder()
+ .withBase(MemoryJamesServerMain.IN_MEMORY_SERVER_AGGREGATE_MODULE)
+ .withMailetContainer(TemporaryJamesServer.defaultMailetContainerConfiguration()
+ .postmaster(POSTMASTER)
+ .putProcessor(transport()))
+ .build(temporaryFolder);
+ jamesServer.start();
+
+ DataProbe dataProbe = jamesServer.getProbe(DataProbeImpl.class);
+ dataProbe.addDomain(DEFAULT_DOMAIN);
+ dataProbe.addUser(RECIPIENT, PASSWORD);
+ }
+
+ @Test
+ void shouldAllowADelayToProcessPendingMails() throws Exception {
+ messageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort())
+ .authenticate(RECIPIENT, PASSWORD)
+ .sendMessage(RECIPIENT, RECIPIENT);
+
+ AtomicBoolean isOff = new AtomicBoolean(false);
+ Mono.fromRunnable(() -> {
+ jamesServer.shutdown();
+ isOff.set(true);
+ })
+ .subscribeOn(Schedulers.boundedElastic())
+ .subscribe();
+
+ awaitAtMostOneMinute.until(() -> PauseThenCountingExecutionMailet.executionCount() == 1);
+ awaitAtMostOneMinute.until(isOff::get);
+ }
+
+ private ProcessorConfiguration.Builder transport() {
+ return ProcessorConfiguration.transport()
+ .addMailet(MailetConfiguration.builder()
+ .matcher(All.class)
+ .mailet(PauseThenCountingExecutionMailet.class))
+ .addMailet(MailetConfiguration.builder()
+ .matcher(All.class)
+ .mailet(Null.class))
+ .addMailetsFrom(CommonProcessors.transport());
+ }
+}
diff --git a/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
index bdb2f7c71a..d940188ecf 100644
--- a/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
+++ b/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
@@ -82,6 +82,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
private final MailQueue queue;
private final Configuration configuration;
private final Scheduler scheduler;
+ private final Scheduler queueScheduler;
private Runner(MetricFactory metricFactory, GaugeRegistry gaugeRegistry, MailProcessor mailProcessor,
MailRepository errorRepository, MailQueue queue, Configuration configuration) {
@@ -94,6 +95,9 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
scheduler = Schedulers.newBoundedElastic(configuration.getConcurrencyLevel() + 1, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
"spooler");
+ queueScheduler = Schedulers.newBoundedElastic(1, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
+ "queueScheduler");
+
this.disposable = run(queue);
gaugeRegistry.register(SPOOL_PROCESSING + ".inFlight",
@@ -104,7 +108,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
return Flux.from(queue.deQueue())
.flatMap(item -> handleOnQueueItem(item).subscribeOn(scheduler), configuration.getConcurrencyLevel())
.onErrorContinue((throwable, item) -> LOGGER.error("Exception processing mail while spooling {}", item, throwable))
- .subscribeOn(scheduler)
+ .subscribeOn(queueScheduler)
.subscribe();
}
@@ -191,12 +195,13 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
public void dispose() {
LOGGER.info("start dispose() ...");
LOGGER.info("Cancel queue consumption...");
- disposable.dispose();
+ queueScheduler.dispose();
LOGGER.info("Queue consumption canceled, shutting down processor threads...");
scheduler.disposeGracefully()
.timeout(Duration.ofSeconds(5))
.onErrorResume(e -> Mono.empty())
.block();
+ disposable.dispose();
LOGGER.info("Thread shutdown completed. Turning off mail queue.");
try {
queue.close();
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org