You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/02/25 14:12:19 UTC

[james-project] branch master updated: JAMES-3888 Sequential execution for IMAP (#1448)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 01869c7ff9 JAMES-3888 Sequential execution for IMAP (#1448)
01869c7ff9 is described below

commit 01869c7ff970cbcb8fc4111f9e0deebf2922d7fe
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Sat Feb 25 21:12:14 2023 +0700

    JAMES-3888 Sequential execution for IMAP (#1448)
---
 .../netty/ImapChannelUpstreamHandler.java          |  4 +-
 .../james/imapserver/netty/Linearalizer.java       | 71 ++++++++++++++++++++++
 .../james/imapserver/netty/NettyConstants.java     |  1 +
 .../james/imapserver/netty/IMAPServerTest.java     | 52 ++++++++++++++++
 .../james/imapserver/netty/LinearalizerTest.java   | 71 ++++++++++++++++++++++
 5 files changed, 198 insertions(+), 1 deletion(-)

diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
index d64d77ed89..ff487c8fb1 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
@@ -174,6 +174,7 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp
             authenticationConfiguration.isPlainAuthEnabled(), sessionId,
             authenticationConfiguration.getOidcSASLConfiguration());
         ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).set(imapsession);
+        ctx.channel().attr(LINEARALIZER_ATTRIBUTE_KEY).set(new Linearalizer());
         MDCBuilder boundMDC = IMAPMDCContext.boundMDC(ctx)
             .addToContext(MDCBuilder.SESSION_ID, sessionId.asString());
         imapsession.setAttribute(MDC_KEY, boundMDC);
@@ -315,6 +316,7 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp
     public void channelRead(ChannelHandlerContext ctx, Object msg) {
         imapCommandsMetric.increment();
         ImapSession session = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get();
+        Linearalizer linearalizer = ctx.channel().attr(LINEARALIZER_ATTRIBUTE_KEY).get();
         Attribute<Disposable> disposableAttribute = ctx.channel().attr(REQUEST_IN_FLIGHT_ATTRIBUTE_KEY);
         ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel());
         ImapResponseComposerImpl response = new ImapResponseComposerImpl(writer);
@@ -324,7 +326,7 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp
         beforeIDLEUponProcessing(ctx);
         ResponseEncoder responseEncoder = new ResponseEncoder(encoder, response);
         Disposable disposable = reactiveThrottler.throttle(
-            processor.processReactive(message, responseEncoder, session)
+            linearalizer.execute(processor.processReactive(message, responseEncoder, session))
                 .doOnEach(Throwing.consumer(signal -> {
                     if (session.getState() == ImapSessionState.LOGOUT) {
                         // Make sure we close the channel after all the buffers were flushed out
diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/Linearalizer.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/Linearalizer.java
new file mode 100644
index 0000000000..05cf3f9e20
--- /dev/null
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/Linearalizer.java
@@ -0,0 +1,71 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.imapserver.netty;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
+
+public class Linearalizer {
+    private final ReentrantLock lock = new ReentrantLock();
+    private boolean inFlight = false;
+    private final Queue<Publisher<Void>> queue = new ConcurrentLinkedQueue<>();
+
+    public Mono<Void> execute(Publisher<Void> task) {
+        lock.lock();
+        try {
+            if (!inFlight) {
+                inFlight = true;
+                return Mono.from(task)
+                    .doFinally(any -> onRequestDone());
+            } else {
+                // Queue the request for later
+                Sinks.One<Void> one = Sinks.one();
+                queue.add(Mono.from(task)
+                    .then(Mono.fromRunnable(() -> one.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST))));
+                // Let the caller await task completion
+                return one.asMono();
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void onRequestDone() {
+        lock.lock();
+        try {
+            Publisher<Void> throttled = queue.poll();
+            if (throttled != null) {
+                Mono.from(throttled)
+                    .doFinally(any -> onRequestDone())
+                    .subscribe();
+            } else {
+                inFlight = false;
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+}
diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
index 0db9543968..ca83f40cde 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
@@ -41,6 +41,7 @@ public interface NettyConstants {
     String HEARTBEAT_HANDLER = "heartbeatHandler";
 
     AttributeKey<ImapSession> IMAP_SESSION_ATTRIBUTE_KEY = AttributeKey.valueOf("ImapSession");
+    AttributeKey<Linearalizer> LINEARALIZER_ATTRIBUTE_KEY = AttributeKey.valueOf("Linearalizer");
     AttributeKey<Disposable> REQUEST_IN_FLIGHT_ATTRIBUTE_KEY = AttributeKey.valueOf("requestInFlight");
     AttributeKey<Map<String, Object>> FRAME_DECODE_ATTACHMENT_ATTRIBUTE_KEY  = AttributeKey.valueOf("FrameDecoderMap");
 
diff --git a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java
index 889ca4a2f6..a293b2904f 100644
--- a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java
+++ b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java
@@ -2250,6 +2250,58 @@ class IMAPServerTest {
         }
     }
 
+    @Nested
+    class SequentialExecution {
+        IMAPServer imapServer;
+        private MailboxSession mailboxSession;
+        private MessageManager inbox;
+        private SocketChannel clientConnection;
+
+        @BeforeEach
+        void beforeEach() throws Exception {
+            imapServer = createImapServer("imapServer.xml");
+            int port = imapServer.getListenAddresses().get(0).getPort();
+            mailboxSession = memoryIntegrationResources.getMailboxManager().createSystemSession(USER);
+            memoryIntegrationResources.getMailboxManager()
+                .createMailbox(MailboxPath.inbox(USER), mailboxSession);
+            inbox = memoryIntegrationResources.getMailboxManager().getMailbox(MailboxPath.inbox(USER), mailboxSession);
+            setUpTestingData();
+
+            clientConnection = SocketChannel.open();
+            clientConnection.connect(new InetSocketAddress(LOCALHOST_IP, port));
+            readBytes(clientConnection);
+        }
+
+        @AfterEach
+        void tearDown() throws Exception {
+            clientConnection.close();
+            imapServer.destroy();
+        }
+
+        private void setUpTestingData() {
+            IntStream.range(0, 37)
+                .forEach(Throwing.intConsumer(i -> inbox.appendMessage(MessageManager.AppendCommand.builder()
+                    .build("MIME-Version: 1.0\r\n\r\nCONTENT\r\n"), mailboxSession)));
+        }
+
+        @Test
+        void ensureSequentialExecutionOfImapRequests() throws Exception {
+            IntStream.range(0, 100)
+                .forEach(Throwing.intConsumer(i -> inbox.appendMessage(MessageManager.AppendCommand.builder()
+                    .build("MIME-Version: 1.0\r\n\r\nCONTENT\r\n"), mailboxSession)));
+
+            clientConnection.write(ByteBuffer.wrap(String.format("a0 LOGIN %s %s\r\n", USER.asString(), USER_PASS).getBytes(StandardCharsets.UTF_8)));
+            readBytes(clientConnection);
+
+            clientConnection.write(ByteBuffer.wrap(("A1 SELECT INBOX\r\nA2 UID FETCH 1:100 (FLAGS)\r\n").getBytes(StandardCharsets.UTF_8)));
+
+            // Select completes first
+            readStringUntil(clientConnection, s -> s.contains("A1 OK [READ-WRITE] SELECT completed."));
+            // Then the FETCH
+            readStringUntil(clientConnection, s -> s.contains("A2 OK FETCH completed."));
+        }
+    }
+
     private byte[] readBytes(SocketChannel channel) throws IOException {
         ByteBuffer line = ByteBuffer.allocate(1024);
         channel.read(line);
diff --git a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/LinearalizerTest.java b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/LinearalizerTest.java
new file mode 100644
index 0000000000..9d745a6188
--- /dev/null
+++ b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/LinearalizerTest.java
@@ -0,0 +1,71 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.imapserver.netty;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import reactor.core.publisher.Mono;
+
+class LinearalizerTest {
+    @Test
+    void shouldExecuteSubmittedTasks() {
+        Linearalizer testee = new Linearalizer();
+
+        // When I submit a task
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Mono.from(testee.execute(Mono.delay(Duration.ofMillis(50)).then(Mono.fromRunnable(() -> executed.getAndSet(true))))).block();
+
+        // Then that task is executed
+        assertThat(executed.get()).isTrue();
+    }
+
+    @Test
+    void shouldNotExecuteQueuedTasksLogicRightAway() {
+        Linearalizer testee = new Linearalizer();
+
+        // When I submit 2 tasks task
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Mono.from(testee.execute(Mono.delay(Duration.ofMillis(200)).then())).subscribe();
+        Mono.from(testee.execute(Mono.fromRunnable(() -> executed.getAndSet(true)))).subscribe();
+
+        // Then the second task is not executed staight away
+        assertThat(executed.get()).isFalse();
+    }
+
+    @Test
+    void shouldEventuallyExecuteQueuedTasks() {
+        Linearalizer testee = new Linearalizer();
+
+        // When I submit 2 tasks task
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Mono.from(testee.execute(Mono.delay(Duration.ofMillis(200)).then())).subscribe();
+        Mono.from(testee.execute(Mono.fromRunnable(() -> executed.getAndSet(true)))).subscribe();
+
+        // Then that task is eventually executed
+        Awaitility.await().atMost(Duration.ofSeconds(10))
+            .untilAsserted(() -> assertThat(executed.get()).isTrue());
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org