You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/03/16 09:17:16 UTC

[james-project] branch master updated (433a161781 -> 4e92cdf609)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


    from 433a161781 JAMES-3534 Identity/set - update document (#1487)
     new 1e25f478e6 JAMES-3891 Memory mailqueue should not display worrying logs upon shotdown
     new 4e92cdf609 JAMES-3891 Integration test for graceful shutdown

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 ....java => PauseThenCountingExecutionMailet.java} | 10 ++--
 .../ShutDownIntegrationTest.java}                  | 68 +++++++---------------
 .../mailetcontainer/impl/JamesMailSpooler.java     |  9 ++-
 .../james/queue/memory/MemoryMailQueueFactory.java |  8 ++-
 4 files changed, 40 insertions(+), 55 deletions(-)
 copy server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/{CountingExecutionTerminatingMailet.java => PauseThenCountingExecutionMailet.java} (88%)
 copy server/mailet/integration-testing/src/test/java/org/apache/james/mailets/{SenderIsLocalIntegrationTest.java => flow/ShutDownIntegrationTest.java} (60%)


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 02/02: JAMES-3891 Integration test for graceful shutdown

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4e92cdf6095ff42011abc48c4b41d0eba15c86e8
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Mar 13 15:14:55 2023 +0700

    JAMES-3891 Integration test for graceful shutdown
    
    Fix:
     - Dedicated scheduler for queue consumption. This allows to stop consumption
     while still processing items.
---
 .../flow/PauseThenCountingExecutionMailet.java     |  47 ++++++++++
 .../mailets/flow/ShutDownIntegrationTest.java      | 104 +++++++++++++++++++++
 .../mailetcontainer/impl/JamesMailSpooler.java     |   9 +-
 3 files changed, 158 insertions(+), 2 deletions(-)

diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/PauseThenCountingExecutionMailet.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/PauseThenCountingExecutionMailet.java
new file mode 100644
index 0000000000..50aceceb94
--- /dev/null
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/PauseThenCountingExecutionMailet.java
@@ -0,0 +1,47 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailets.flow;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.mailet.Mail;
+import org.apache.mailet.base.GenericMailet;
+
+public class PauseThenCountingExecutionMailet extends GenericMailet {
+    private static final AtomicLong executionCount = new AtomicLong();
+
+    public static void reset() {
+        executionCount.set(0L);
+    }
+
+    public static long executionCount() {
+        return executionCount.get();
+    }
+
+    @Override
+    public void service(Mail mail) {
+        try {
+            Thread.sleep(500);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+        executionCount.incrementAndGet();
+    }
+}
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ShutDownIntegrationTest.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ShutDownIntegrationTest.java
new file mode 100644
index 0000000000..15615ce46e
--- /dev/null
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ShutDownIntegrationTest.java
@@ -0,0 +1,104 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailets.flow;
+
+import static org.apache.james.mailets.configuration.Constants.DEFAULT_DOMAIN;
+import static org.apache.james.mailets.configuration.Constants.LOCALHOST_IP;
+import static org.apache.james.mailets.configuration.Constants.PASSWORD;
+import static org.apache.james.mailets.configuration.Constants.RECIPIENT;
+import static org.apache.james.mailets.configuration.Constants.awaitAtMostOneMinute;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.james.MemoryJamesServerMain;
+import org.apache.james.mailets.TemporaryJamesServer;
+import org.apache.james.mailets.configuration.CommonProcessors;
+import org.apache.james.mailets.configuration.MailetConfiguration;
+import org.apache.james.mailets.configuration.ProcessorConfiguration;
+import org.apache.james.modules.protocols.SmtpGuiceProbe;
+import org.apache.james.probe.DataProbe;
+import org.apache.james.transport.mailets.Null;
+import org.apache.james.transport.matchers.All;
+import org.apache.james.utils.DataProbeImpl;
+import org.apache.james.utils.SMTPMessageSender;
+import org.apache.james.utils.TestIMAPClient;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+class ShutDownIntegrationTest {
+    private static final String POSTMASTER = "postmaster@" + DEFAULT_DOMAIN;
+
+    @RegisterExtension
+    public TestIMAPClient testIMAPClient = new TestIMAPClient();
+    @RegisterExtension
+    public SMTPMessageSender messageSender = new SMTPMessageSender(DEFAULT_DOMAIN);
+
+    private TemporaryJamesServer jamesServer;
+
+    @BeforeEach
+    void setUp(@TempDir File temporaryFolder) throws Exception {
+        jamesServer = TemporaryJamesServer.builder()
+            .withBase(MemoryJamesServerMain.IN_MEMORY_SERVER_AGGREGATE_MODULE)
+            .withMailetContainer(TemporaryJamesServer.defaultMailetContainerConfiguration()
+                .postmaster(POSTMASTER)
+                .putProcessor(transport()))
+            .build(temporaryFolder);
+        jamesServer.start();
+
+        DataProbe dataProbe = jamesServer.getProbe(DataProbeImpl.class);
+        dataProbe.addDomain(DEFAULT_DOMAIN);
+        dataProbe.addUser(RECIPIENT, PASSWORD);
+    }
+
+    @Test
+    void shouldAllowADelayToProcessPendingMails() throws Exception {
+        messageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort())
+            .authenticate(RECIPIENT, PASSWORD)
+            .sendMessage(RECIPIENT, RECIPIENT);
+
+        AtomicBoolean isOff = new AtomicBoolean(false);
+        Mono.fromRunnable(() -> {
+            jamesServer.shutdown();
+            isOff.set(true);
+        })
+            .subscribeOn(Schedulers.boundedElastic())
+            .subscribe();
+
+        awaitAtMostOneMinute.until(() -> PauseThenCountingExecutionMailet.executionCount() == 1);
+        awaitAtMostOneMinute.until(isOff::get);
+    }
+
+    private ProcessorConfiguration.Builder transport() {
+        return ProcessorConfiguration.transport()
+            .addMailet(MailetConfiguration.builder()
+                .matcher(All.class)
+                .mailet(PauseThenCountingExecutionMailet.class))
+            .addMailet(MailetConfiguration.builder()
+                .matcher(All.class)
+                .mailet(Null.class))
+            .addMailetsFrom(CommonProcessors.transport());
+    }
+}
diff --git a/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
index bdb2f7c71a..d940188ecf 100644
--- a/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
+++ b/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
@@ -82,6 +82,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
         private final MailQueue queue;
         private final Configuration configuration;
         private final Scheduler scheduler;
+        private final Scheduler queueScheduler;
 
         private Runner(MetricFactory metricFactory, GaugeRegistry gaugeRegistry, MailProcessor mailProcessor,
                        MailRepository errorRepository, MailQueue queue, Configuration configuration) {
@@ -94,6 +95,9 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
             scheduler = Schedulers.newBoundedElastic(configuration.getConcurrencyLevel() + 1, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
                 "spooler");
 
+            queueScheduler = Schedulers.newBoundedElastic(1, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
+                "queueScheduler");
+
             this.disposable = run(queue);
 
             gaugeRegistry.register(SPOOL_PROCESSING + ".inFlight",
@@ -104,7 +108,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
             return Flux.from(queue.deQueue())
                 .flatMap(item -> handleOnQueueItem(item).subscribeOn(scheduler), configuration.getConcurrencyLevel())
                 .onErrorContinue((throwable, item) -> LOGGER.error("Exception processing mail while spooling {}", item, throwable))
-                .subscribeOn(scheduler)
+                .subscribeOn(queueScheduler)
                 .subscribe();
         }
 
@@ -191,12 +195,13 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
         public void dispose() {
             LOGGER.info("start dispose() ...");
             LOGGER.info("Cancel queue consumption...");
-            disposable.dispose();
+            queueScheduler.dispose();
             LOGGER.info("Queue consumption canceled, shutting down processor threads...");
             scheduler.disposeGracefully()
                 .timeout(Duration.ofSeconds(5))
                 .onErrorResume(e -> Mono.empty())
                 .block();
+            disposable.dispose();
             LOGGER.info("Thread shutdown completed. Turning off mail queue.");
             try {
                 queue.close();


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 01/02: JAMES-3891 Memory mailqueue should not display worrying logs upon shotdown

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1e25f478e6d9a05f7f525a1373abb517c8618ae7
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Mar 13 15:13:35 2023 +0700

    JAMES-3891 Memory mailqueue should not display worrying logs upon shotdown
    
    Manage Interrupted exception
---
 .../org/apache/james/queue/memory/MemoryMailQueueFactory.java     | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
index 10622462ef..73bf35f1d7 100644
--- a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
+++ b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
@@ -116,7 +116,13 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
             this.inProcessingMailItems = new LinkedBlockingDeque<>();
             this.name = name;
             this.scheduler = Schedulers.newSingle("memory-mail-queue");
-            this.flux = Mono.fromCallable(mailItems::take)
+            this.flux = Mono.<MemoryMailQueueItem>create(sink -> {
+                try {
+                    sink.success(mailItems.take());
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            })
                 .repeat()
                 .subscribeOn(scheduler)
                 .flatMap(item ->


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org