You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/02/25 14:12:19 UTC
[james-project] branch master updated: JAMES-3888 Sequential execution for IMAP (#1448)
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 01869c7ff9 JAMES-3888 Sequential execution for IMAP (#1448)
01869c7ff9 is described below
commit 01869c7ff970cbcb8fc4111f9e0deebf2922d7fe
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Sat Feb 25 21:12:14 2023 +0700
JAMES-3888 Sequential execution for IMAP (#1448)
---
.../netty/ImapChannelUpstreamHandler.java | 4 +-
.../james/imapserver/netty/Linearalizer.java | 71 ++++++++++++++++++++++
.../james/imapserver/netty/NettyConstants.java | 1 +
.../james/imapserver/netty/IMAPServerTest.java | 52 ++++++++++++++++
.../james/imapserver/netty/LinearalizerTest.java | 71 ++++++++++++++++++++++
5 files changed, 198 insertions(+), 1 deletion(-)
diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
index d64d77ed89..ff487c8fb1 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
@@ -174,6 +174,7 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp
authenticationConfiguration.isPlainAuthEnabled(), sessionId,
authenticationConfiguration.getOidcSASLConfiguration());
ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).set(imapsession);
+ ctx.channel().attr(LINEARALIZER_ATTRIBUTE_KEY).set(new Linearalizer());
MDCBuilder boundMDC = IMAPMDCContext.boundMDC(ctx)
.addToContext(MDCBuilder.SESSION_ID, sessionId.asString());
imapsession.setAttribute(MDC_KEY, boundMDC);
@@ -315,6 +316,7 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp
public void channelRead(ChannelHandlerContext ctx, Object msg) {
imapCommandsMetric.increment();
ImapSession session = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get();
+ Linearalizer linearalizer = ctx.channel().attr(LINEARALIZER_ATTRIBUTE_KEY).get();
Attribute<Disposable> disposableAttribute = ctx.channel().attr(REQUEST_IN_FLIGHT_ATTRIBUTE_KEY);
ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel());
ImapResponseComposerImpl response = new ImapResponseComposerImpl(writer);
@@ -324,7 +326,7 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp
beforeIDLEUponProcessing(ctx);
ResponseEncoder responseEncoder = new ResponseEncoder(encoder, response);
Disposable disposable = reactiveThrottler.throttle(
- processor.processReactive(message, responseEncoder, session)
+ linearalizer.execute(processor.processReactive(message, responseEncoder, session))
.doOnEach(Throwing.consumer(signal -> {
if (session.getState() == ImapSessionState.LOGOUT) {
// Make sure we close the channel after all the buffers were flushed out
diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/Linearalizer.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/Linearalizer.java
new file mode 100644
index 0000000000..05cf3f9e20
--- /dev/null
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/Linearalizer.java
@@ -0,0 +1,71 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.imapserver.netty;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
+
+public class Linearalizer {
+ private final ReentrantLock lock = new ReentrantLock();
+ private boolean inFlight = false;
+ private final Queue<Publisher<Void>> queue = new ConcurrentLinkedQueue<>();
+
+ public Mono<Void> execute(Publisher<Void> task) {
+ lock.lock();
+ try {
+ if (!inFlight) {
+ inFlight = true;
+ return Mono.from(task)
+ .doFinally(any -> onRequestDone());
+ } else {
+ // Queue the request for later
+ Sinks.One<Void> one = Sinks.one();
+ queue.add(Mono.from(task)
+ .then(Mono.fromRunnable(() -> one.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST))));
+ // Let the caller await task completion
+ return one.asMono();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void onRequestDone() {
+ lock.lock();
+ try {
+ Publisher<Void> throttled = queue.poll();
+ if (throttled != null) {
+ Mono.from(throttled)
+ .doFinally(any -> onRequestDone())
+ .subscribe();
+ } else {
+ inFlight = false;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+}
diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
index 0db9543968..ca83f40cde 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
@@ -41,6 +41,7 @@ public interface NettyConstants {
String HEARTBEAT_HANDLER = "heartbeatHandler";
AttributeKey<ImapSession> IMAP_SESSION_ATTRIBUTE_KEY = AttributeKey.valueOf("ImapSession");
+ AttributeKey<Linearalizer> LINEARALIZER_ATTRIBUTE_KEY = AttributeKey.valueOf("Linearalizer");
AttributeKey<Disposable> REQUEST_IN_FLIGHT_ATTRIBUTE_KEY = AttributeKey.valueOf("requestInFlight");
AttributeKey<Map<String, Object>> FRAME_DECODE_ATTACHMENT_ATTRIBUTE_KEY = AttributeKey.valueOf("FrameDecoderMap");
diff --git a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java
index 889ca4a2f6..a293b2904f 100644
--- a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java
+++ b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java
@@ -2250,6 +2250,58 @@ class IMAPServerTest {
}
}
+ @Nested
+ class SequentialExecution {
+ IMAPServer imapServer;
+ private MailboxSession mailboxSession;
+ private MessageManager inbox;
+ private SocketChannel clientConnection;
+
+ @BeforeEach
+ void beforeEach() throws Exception {
+ imapServer = createImapServer("imapServer.xml");
+ int port = imapServer.getListenAddresses().get(0).getPort();
+ mailboxSession = memoryIntegrationResources.getMailboxManager().createSystemSession(USER);
+ memoryIntegrationResources.getMailboxManager()
+ .createMailbox(MailboxPath.inbox(USER), mailboxSession);
+ inbox = memoryIntegrationResources.getMailboxManager().getMailbox(MailboxPath.inbox(USER), mailboxSession);
+ setUpTestingData();
+
+ clientConnection = SocketChannel.open();
+ clientConnection.connect(new InetSocketAddress(LOCALHOST_IP, port));
+ readBytes(clientConnection);
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ clientConnection.close();
+ imapServer.destroy();
+ }
+
+ private void setUpTestingData() {
+ IntStream.range(0, 37)
+ .forEach(Throwing.intConsumer(i -> inbox.appendMessage(MessageManager.AppendCommand.builder()
+ .build("MIME-Version: 1.0\r\n\r\nCONTENT\r\n"), mailboxSession)));
+ }
+
+ @Test
+ void ensureSequentialExecutionOfImapRequests() throws Exception {
+ IntStream.range(0, 100)
+ .forEach(Throwing.intConsumer(i -> inbox.appendMessage(MessageManager.AppendCommand.builder()
+ .build("MIME-Version: 1.0\r\n\r\nCONTENT\r\n"), mailboxSession)));
+
+ clientConnection.write(ByteBuffer.wrap(String.format("a0 LOGIN %s %s\r\n", USER.asString(), USER_PASS).getBytes(StandardCharsets.UTF_8)));
+ readBytes(clientConnection);
+
+ clientConnection.write(ByteBuffer.wrap(("A1 SELECT INBOX\r\nA2 UID FETCH 1:100 (FLAGS)\r\n").getBytes(StandardCharsets.UTF_8)));
+
+ // Select completes first
+ readStringUntil(clientConnection, s -> s.contains("A1 OK [READ-WRITE] SELECT completed."));
+ // Then the FETCH
+ readStringUntil(clientConnection, s -> s.contains("A2 OK FETCH completed."));
+ }
+ }
+
private byte[] readBytes(SocketChannel channel) throws IOException {
ByteBuffer line = ByteBuffer.allocate(1024);
channel.read(line);
diff --git a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/LinearalizerTest.java b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/LinearalizerTest.java
new file mode 100644
index 0000000000..9d745a6188
--- /dev/null
+++ b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/LinearalizerTest.java
@@ -0,0 +1,71 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.imapserver.netty;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import reactor.core.publisher.Mono;
+
+class LinearalizerTest {
+ @Test
+ void shouldExecuteSubmittedTasks() {
+ Linearalizer testee = new Linearalizer();
+
+ // When I submit a task
+ AtomicBoolean executed = new AtomicBoolean(false);
+ Mono.from(testee.execute(Mono.delay(Duration.ofMillis(50)).then(Mono.fromRunnable(() -> executed.getAndSet(true))))).block();
+
+ // Then that task is executed
+ assertThat(executed.get()).isTrue();
+ }
+
+ @Test
+ void shouldNotExecuteQueuedTasksLogicRightAway() {
+ Linearalizer testee = new Linearalizer();
+
+ // When I submit 2 tasks task
+ AtomicBoolean executed = new AtomicBoolean(false);
+ Mono.from(testee.execute(Mono.delay(Duration.ofMillis(200)).then())).subscribe();
+ Mono.from(testee.execute(Mono.fromRunnable(() -> executed.getAndSet(true)))).subscribe();
+
+ // Then the second task is not executed staight away
+ assertThat(executed.get()).isFalse();
+ }
+
+ @Test
+ void shouldEventuallyExecuteQueuedTasks() {
+ Linearalizer testee = new Linearalizer();
+
+ // When I submit 2 tasks task
+ AtomicBoolean executed = new AtomicBoolean(false);
+ Mono.from(testee.execute(Mono.delay(Duration.ofMillis(200)).then())).subscribe();
+ Mono.from(testee.execute(Mono.fromRunnable(() -> executed.getAndSet(true)))).subscribe();
+
+ // Then that task is eventually executed
+ Awaitility.await().atMost(Duration.ofSeconds(10))
+ .untilAsserted(() -> assertThat(executed.get()).isTrue());
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org