You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/03/19 14:35:37 UTC
[james-project] branch 3.7.x updated: JAMES-3891 Graceful shutdown for queue consumers [3.7.x] (#1479)
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch 3.7.x
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/3.7.x by this push:
new f643ab3657 JAMES-3891 Graceful shutdown for queue consumers [3.7.x] (#1479)
f643ab3657 is described below
commit f643ab36575995e43329068241213fe103d48018
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Sun Mar 19 15:35:29 2023 +0100
JAMES-3891 Graceful shutdown for queue consumers [3.7.x] (#1479)
Graceful shutdown for scheduler is a late addition to the reactor framework,
hence I propose an alternative to it.
---
.../flow/PauseThenCountingExecutionMailet.java | 47 ++++++++++
.../mailets/flow/ShutDownIntegrationTest.java | 104 +++++++++++++++++++++
server/mailet/mailetcontainer-impl/pom.xml | 1 -
.../mailetcontainer/impl/JamesMailSpooler.java | 36 ++++++-
server/mailet/mailets/pom.xml | 4 +
.../james/transport/mailets/RemoteDelivery.java | 2 +-
.../mailets/remote/delivery/DeliveryRunnable.java | 16 ++++
7 files changed, 204 insertions(+), 6 deletions(-)
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/PauseThenCountingExecutionMailet.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/PauseThenCountingExecutionMailet.java
new file mode 100644
index 0000000000..50aceceb94
--- /dev/null
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/PauseThenCountingExecutionMailet.java
@@ -0,0 +1,47 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailets.flow;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.mailet.Mail;
+import org.apache.mailet.base.GenericMailet;
+
+public class PauseThenCountingExecutionMailet extends GenericMailet {
+ private static final AtomicLong executionCount = new AtomicLong();
+
+ public static void reset() {
+ executionCount.set(0L);
+ }
+
+ public static long executionCount() {
+ return executionCount.get();
+ }
+
+ @Override
+ public void service(Mail mail) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ executionCount.incrementAndGet();
+ }
+}
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ShutDownIntegrationTest.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ShutDownIntegrationTest.java
new file mode 100644
index 0000000000..15615ce46e
--- /dev/null
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ShutDownIntegrationTest.java
@@ -0,0 +1,104 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailets.flow;
+
+import static org.apache.james.mailets.configuration.Constants.DEFAULT_DOMAIN;
+import static org.apache.james.mailets.configuration.Constants.LOCALHOST_IP;
+import static org.apache.james.mailets.configuration.Constants.PASSWORD;
+import static org.apache.james.mailets.configuration.Constants.RECIPIENT;
+import static org.apache.james.mailets.configuration.Constants.awaitAtMostOneMinute;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.james.MemoryJamesServerMain;
+import org.apache.james.mailets.TemporaryJamesServer;
+import org.apache.james.mailets.configuration.CommonProcessors;
+import org.apache.james.mailets.configuration.MailetConfiguration;
+import org.apache.james.mailets.configuration.ProcessorConfiguration;
+import org.apache.james.modules.protocols.SmtpGuiceProbe;
+import org.apache.james.probe.DataProbe;
+import org.apache.james.transport.mailets.Null;
+import org.apache.james.transport.matchers.All;
+import org.apache.james.utils.DataProbeImpl;
+import org.apache.james.utils.SMTPMessageSender;
+import org.apache.james.utils.TestIMAPClient;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+class ShutDownIntegrationTest {
+ private static final String POSTMASTER = "postmaster@" + DEFAULT_DOMAIN;
+
+ @RegisterExtension
+ public TestIMAPClient testIMAPClient = new TestIMAPClient();
+ @RegisterExtension
+ public SMTPMessageSender messageSender = new SMTPMessageSender(DEFAULT_DOMAIN);
+
+ private TemporaryJamesServer jamesServer;
+
+ @BeforeEach
+ void setUp(@TempDir File temporaryFolder) throws Exception {
+ jamesServer = TemporaryJamesServer.builder()
+ .withBase(MemoryJamesServerMain.IN_MEMORY_SERVER_AGGREGATE_MODULE)
+ .withMailetContainer(TemporaryJamesServer.defaultMailetContainerConfiguration()
+ .postmaster(POSTMASTER)
+ .putProcessor(transport()))
+ .build(temporaryFolder);
+ jamesServer.start();
+
+ DataProbe dataProbe = jamesServer.getProbe(DataProbeImpl.class);
+ dataProbe.addDomain(DEFAULT_DOMAIN);
+ dataProbe.addUser(RECIPIENT, PASSWORD);
+ }
+
+ @Test
+ void shouldAllowADelayToProcessPendingMails() throws Exception {
+ messageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort())
+ .authenticate(RECIPIENT, PASSWORD)
+ .sendMessage(RECIPIENT, RECIPIENT);
+
+ AtomicBoolean isOff = new AtomicBoolean(false);
+ Mono.fromRunnable(() -> {
+ jamesServer.shutdown();
+ isOff.set(true);
+ })
+ .subscribeOn(Schedulers.boundedElastic())
+ .subscribe();
+
+ awaitAtMostOneMinute.until(() -> PauseThenCountingExecutionMailet.executionCount() == 1);
+ awaitAtMostOneMinute.until(isOff::get);
+ }
+
+ private ProcessorConfiguration.Builder transport() {
+ return ProcessorConfiguration.transport()
+ .addMailet(MailetConfiguration.builder()
+ .matcher(All.class)
+ .mailet(PauseThenCountingExecutionMailet.class))
+ .addMailet(MailetConfiguration.builder()
+ .matcher(All.class)
+ .mailet(Null.class))
+ .addMailetsFrom(CommonProcessors.transport());
+ }
+}
diff --git a/server/mailet/mailetcontainer-impl/pom.xml b/server/mailet/mailetcontainer-impl/pom.xml
index 62b336035a..c51a575742 100644
--- a/server/mailet/mailetcontainer-impl/pom.xml
+++ b/server/mailet/mailetcontainer-impl/pom.xml
@@ -134,7 +134,6 @@
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
diff --git a/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
index 47f88fa9ea..e388af4879 100644
--- a/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
+++ b/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
@@ -19,7 +19,10 @@
package org.apache.james.mailetcontainer.impl;
+import static reactor.core.scheduler.Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE;
+
import java.io.IOException;
+import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
@@ -50,6 +53,7 @@ import org.apache.mailet.Attribute;
import org.apache.mailet.AttributeName;
import org.apache.mailet.AttributeValue;
import org.apache.mailet.Mail;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,6 +63,7 @@ import com.google.common.collect.ImmutableList;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
@@ -75,6 +80,8 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
private final reactor.core.Disposable disposable;
private final MailQueue queue;
private final Configuration configuration;
+ private final Scheduler scheduler;
+ private final Scheduler queueScheduler;
private Runner(MetricFactory metricFactory, GaugeRegistry gaugeRegistry, MailProcessor mailProcessor,
MailRepository errorRepository, MailQueue queue, Configuration configuration) {
@@ -84,7 +91,13 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
this.queue = queue;
this.configuration = configuration;
- this.disposable = run(queue);
+ scheduler = Schedulers.newBoundedElastic(configuration.getConcurrencyLevel() + 1, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
+ "spooler");
+
+ queueScheduler = Schedulers.newBoundedElastic(1, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
+ "queueScheduler");
+
+ disposable = run(queue);
gaugeRegistry.register(SPOOL_PROCESSING + ".inFlight",
processingActive::get);
@@ -92,9 +105,9 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
private reactor.core.Disposable run(MailQueue queue) {
return Flux.from(queue.deQueue())
- .flatMap(item -> handleOnQueueItem(item).subscribeOn(Schedulers.elastic()), configuration.getConcurrencyLevel())
+ .flatMap(item -> handleOnQueueItem(item).subscribeOn(scheduler), configuration.getConcurrencyLevel())
.onErrorContinue((throwable, item) -> LOGGER.error("Exception processing mail while spooling {}", item, throwable))
- .subscribeOn(Schedulers.elastic())
+ .subscribeOn(queueScheduler)
.subscribe();
}
@@ -180,13 +193,28 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
public void dispose() {
LOGGER.info("start dispose() ...");
+ LOGGER.info("Cancel queue consumption...");
+ queueScheduler.dispose();
+ LOGGER.info("Queue consumption canceled, awaiting task completion...");
+
+ try {
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(2))
+ .pollDelay(Duration.ofMillis(100))
+ .until(() -> processingActive.get() == 0);
+ } catch (Exception e) {
+ LOGGER.warn("Failure disposing gracefully JamesMailSpooler", e);
+ }
+
+ LOGGER.info("Shutting down processor threads...");
+ scheduler.dispose();
disposable.dispose();
+ LOGGER.info("Thread shutdown completed. Turning off mail queue.");
try {
queue.close();
} catch (IOException e) {
LOGGER.debug("error closing queue", e);
}
- LOGGER.info("thread shutdown completed.");
}
public int getCurrentSpoolCount() {
diff --git a/server/mailet/mailets/pom.xml b/server/mailet/mailets/pom.xml
index 718631f3f6..147ef846bf 100644
--- a/server/mailet/mailets/pom.xml
+++ b/server/mailet/mailets/pom.xml
@@ -231,6 +231,10 @@
<groupId>org.apache.james.jspf</groupId>
<artifactId>apache-jspf-resolver</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ </dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>java-hamcrest</artifactId>
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
index 633cf0775d..a78c724884 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
@@ -253,7 +253,7 @@ public class RemoteDelivery extends GenericMailet {
* service.
*/
@Override
- public synchronized void destroy() {
+ public void destroy() {
if (startThreads == ThreadState.START_THREADS) {
deliveryRunnable.dispose();
}
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
index 25d3a2d549..a24b4f9b33 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
@@ -24,6 +24,7 @@ import static org.apache.james.transport.mailets.remote.delivery.Bouncer.IS_DELI
import java.io.Closeable;
import java.time.Duration;
import java.util.Date;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.james.dnsservice.api.DNSService;
@@ -38,6 +39,7 @@ import org.apache.mailet.Attribute;
import org.apache.mailet.AttributeValue;
import org.apache.mailet.Mail;
import org.apache.mailet.MailetContext;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,6 +69,7 @@ public class DeliveryRunnable implements Disposable {
private final MailDelivrer mailDelivrer;
private final Supplier<Date> dateSupplier;
private final MailetContext mailetContext;
+ private final AtomicInteger pendingDeliveries;
private Disposable disposable;
private Scheduler remoteDeliveryProcessScheduler;
private Scheduler remoteDeliveryDequeueScheduler;
@@ -89,6 +92,7 @@ public class DeliveryRunnable implements Disposable {
this.dateSupplier = dateSupplier;
this.metricFactory = metricFactory;
this.mailetContext = mailetContext;
+ this.pendingDeliveries = new AtomicInteger(0);
}
public void start() {
@@ -111,6 +115,7 @@ public class DeliveryRunnable implements Disposable {
private Mono<Void> processMail(MailQueue.MailQueueItem queueItem) {
return Mono.create(sink -> {
Mail mail = queueItem.getMail();
+ pendingDeliveries.incrementAndGet();
try (Closeable closeable =
MDCBuilder.create()
@@ -136,6 +141,7 @@ public class DeliveryRunnable implements Disposable {
sink.error(e);
} finally {
LifecycleUtil.dispose(mail);
+ pendingDeliveries.decrementAndGet();
}
});
}
@@ -204,6 +210,16 @@ public class DeliveryRunnable implements Disposable {
@Override
public void dispose() {
disposable.dispose();
+
+ try {
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(2))
+ .pollDelay(Duration.ofMillis(100))
+ .until(() -> pendingDeliveries.get() == 0);
+ } catch (Exception e) {
+ LOGGER.warn("Failure disposing gracefully RemoteDelivery", e);
+ }
+
remoteDeliveryProcessScheduler.dispose();
remoteDeliveryDequeueScheduler.dispose();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org