You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/17 15:30:46 UTC

[james-project] 12/16: [REFACTORING] CassandraMessageDAO should only care about retrieving a single element

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 3d0bc77731d05af96a712e815ecd419822f85e0e
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun Apr 12 11:18:02 2020 +0700

    [REFACTORING] CassandraMessageDAO should only care about retrieving a single element
    
    Retrieving several elements is an heritage from the `SELECT ... IN`
    sementic (allowing to retrieve several element at once at the Cassandra
    level, but sub optimal as it comes at a great coordination cost).
    
    This sementic:
     - Makes the DAO responsible of applying the limit (untested)
     - Forces to have a found/not found semantic
     - Forces to have "reactor windowing" rather than a classic "flatMap"
     concurrency limitation
    
    Reactor allows easy composition of Mono, thus this costly semantic is
    no longer needed.
---
 .../cassandra/mail/CassandraMessageDAO.java        |  8 --------
 .../cassandra/mail/CassandraMessageMapper.java     | 15 +++++---------
 .../cassandra/mail/CassandraMessageDAOTest.java    | 24 ++++++++++------------
 .../java/org/apache/james/util/streams/Limit.java  |  8 ++++++++
 4 files changed, 24 insertions(+), 31 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index fc92869..58e25e3 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -74,7 +74,6 @@ import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.Property;
 import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
-import org.apache.james.util.streams.Limit;
 
 import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.PreparedStatement;
@@ -93,7 +92,6 @@ import com.google.common.primitives.Bytes;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
 import reactor.util.function.Tuple2;
 
 public class CassandraMessageDAO {
@@ -241,12 +239,6 @@ public class CassandraMessageDAO {
                 .flatMap(resultSet -> message(resultSet, id, fetchType));
     }
 
-    public Flux<MessageRepresentation> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
-        return Flux.fromStream(limit.applyOnStream(messageIds.stream().distinct()))
-            .publishOn(Schedulers.elastic())
-            .flatMap(id -> retrieveMessage(id, fetchType), configuration.getMessageReadChunkSize());
-    }
-
     private Mono<ResultSet> retrieveRow(ComposedMessageIdWithMetaData messageId, FetchType fetchType) {
         CassandraMessageId cassandraMessageId = (CassandraMessageId) messageId.getComposedMessageId().getMessageId();
 
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index b7e6ed3..919da37 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -161,22 +161,17 @@ public class CassandraMessageMapper implements MessageMapper {
     @Override
     public Iterator<MailboxMessage> findInMailbox(Mailbox mailbox, MessageRange messageRange, FetchType ftype, int max) {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
-        return retrieveMessageIds(mailboxId, messageRange)
-            .flatMap(ids -> retrieveMessages(ids, ftype, Limit.from(max)))
+        return Limit.from(max).applyOnFlux(
+            messageIdDAO.retrieveMessages(mailboxId, messageRange)
+                .flatMap(id -> retrieveMessage(id, ftype), cassandraConfiguration.getMessageReadChunkSize()))
             .map(MailboxMessage.class::cast)
             .sort(Comparator.comparing(MailboxMessage::getUid))
             .toIterable()
             .iterator();
     }
 
-    private Flux<List<ComposedMessageIdWithMetaData>> retrieveMessageIds(CassandraId mailboxId, MessageRange messageRange) {
-        return messageIdDAO.retrieveMessages(mailboxId, messageRange)
-            .window(cassandraConfiguration.getMessageReadChunkSize())
-            .flatMap(flux -> flux.collect(Guavate.toImmutableList()));
-    }
-
-    private Flux<MailboxMessage> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
-        return messageDAO.retrieveMessages(messageIds, fetchType, limit)
+    private Mono<MailboxMessage> retrieveMessage(ComposedMessageIdWithMetaData messageId, FetchType fetchType) {
+        return messageDAO.retrieveMessage(messageId, fetchType)
             .flatMap(messageRepresentation -> attachmentLoader.addAttachmentToMessage(messageRepresentation, fetchType));
     }
 
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
index daf4767..d7fe3da 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
@@ -53,7 +53,6 @@ import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
-import org.apache.james.util.streams.Limit;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -63,7 +62,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.primitives.Bytes;
 
 import nl.jqno.equalsverifier.EqualsVerifier;
-import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 class CassandraMessageDAOTest {
     private static final int BODY_START = 16;
@@ -86,7 +85,7 @@ class CassandraMessageDAOTest {
 
     private SimpleMailboxMessage message;
     private CassandraMessageId messageId;
-    private List<ComposedMessageIdWithMetaData> messageIds;
+    private ComposedMessageIdWithMetaData messageIdWithMetadata;
 
     @BeforeEach
     void setUp(CassandraCluster cassandra) {
@@ -97,11 +96,11 @@ class CassandraMessageDAOTest {
         testee = new CassandraMessageDAO(cassandra.getConf(), cassandra.getTypesProvider(), blobStore, blobIdFactory,
             new CassandraMessageId.Factory());
 
-        messageIds = ImmutableList.of(ComposedMessageIdWithMetaData.builder()
+        messageIdWithMetadata = ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(new ComposedMessageId(MAILBOX_ID, messageId, messageUid))
                 .flags(new Flags())
                 .modSeq(ModSeq.of(1))
-                .build());
+                .build();
     }
 
     @Test
@@ -111,7 +110,7 @@ class CassandraMessageDAOTest {
         testee.save(message).block();
 
         MessageRepresentation attachmentRepresentation =
-            toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Metadata, Limit.unlimited()));
+            toMessage(testee.retrieveMessage(messageIdWithMetadata, MessageMapper.FetchType.Metadata));
 
         assertThat(attachmentRepresentation.getPropertyBuilder().getTextualLineCount())
             .isEqualTo(0L);
@@ -127,7 +126,7 @@ class CassandraMessageDAOTest {
         testee.save(message).block();
 
         MessageRepresentation attachmentRepresentation =
-            toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Metadata, Limit.unlimited()));
+            toMessage(testee.retrieveMessage(messageIdWithMetadata, MessageMapper.FetchType.Metadata));
 
         assertThat(attachmentRepresentation.getPropertyBuilder().getTextualLineCount()).isEqualTo(textualLineCount);
     }
@@ -139,7 +138,7 @@ class CassandraMessageDAOTest {
         testee.save(message).block();
 
         MessageRepresentation attachmentRepresentation =
-            toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Full, Limit.unlimited()));
+            toMessage(testee.retrieveMessage(messageIdWithMetadata, MessageMapper.FetchType.Full));
 
         assertThat(IOUtils.toString(attachmentRepresentation.getContent(), StandardCharsets.UTF_8))
             .isEqualTo(CONTENT);
@@ -152,7 +151,7 @@ class CassandraMessageDAOTest {
         testee.save(message).block();
 
         MessageRepresentation attachmentRepresentation =
-            toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Body, Limit.unlimited()));
+            toMessage(testee.retrieveMessage(messageIdWithMetadata, MessageMapper.FetchType.Body));
 
         byte[] expected = Bytes.concat(
             new byte[BODY_START],
@@ -168,7 +167,7 @@ class CassandraMessageDAOTest {
         testee.save(message).block();
 
         MessageRepresentation attachmentRepresentation =
-            toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Headers, Limit.unlimited()));
+            toMessage(testee.retrieveMessage(messageIdWithMetadata, MessageMapper.FetchType.Headers));
 
         assertThat(IOUtils.toString(attachmentRepresentation.getContent(), StandardCharsets.UTF_8))
             .isEqualTo(CONTENT.substring(0, BODY_START));
@@ -189,9 +188,8 @@ class CassandraMessageDAOTest {
             .build();
     }
 
-    private MessageRepresentation toMessage(Flux<MessageRepresentation> read) {
-        return read.toStream()
-            .findAny()
+    private MessageRepresentation toMessage(Mono<MessageRepresentation> read) {
+        return read.blockOptional()
             .orElseThrow(() -> new IllegalStateException("Collection is not supposed to be empty"));
     }
 
diff --git a/server/container/util/src/main/java/org/apache/james/util/streams/Limit.java b/server/container/util/src/main/java/org/apache/james/util/streams/Limit.java
index 268ed5e..466f1fb 100644
--- a/server/container/util/src/main/java/org/apache/james/util/streams/Limit.java
+++ b/server/container/util/src/main/java/org/apache/james/util/streams/Limit.java
@@ -25,6 +25,8 @@ import java.util.stream.Stream;
 
 import com.google.common.base.Preconditions;
 
+import reactor.core.publisher.Flux;
+
 public class Limit {
 
     public static Limit from(int limit) {
@@ -65,6 +67,12 @@ public class Limit {
             .orElse(stream);
     }
 
+    public <T> Flux<T> applyOnFlux(Flux<T> flux) {
+        return limit
+            .map(flux::take)
+            .orElse(flux);
+    }
+
     @Override
     public final boolean equals(Object o) {
         if (o instanceof Limit) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org