You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/17 15:30:46 UTC
[james-project] 12/16: [REFACTORING] CassandraMessageDAO should
only care about retrieving a single element
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 3d0bc77731d05af96a712e815ecd419822f85e0e
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun Apr 12 11:18:02 2020 +0700
[REFACTORING] CassandraMessageDAO should only care about retrieving a single element
Retrieving several elements is an heritage from the `SELECT ... IN`
sementic (allowing to retrieve several element at once at the Cassandra
level, but sub optimal as it comes at a great coordination cost).
This sementic:
- Makes the DAO responsible of applying the limit (untested)
- Forces to have a found/not found semantic
- Forces to have "reactor windowing" rather than a classic "flatMap"
concurrency limitation
Reactor allows easy composition of Mono, thus this costly semantic is
no longer needed.
---
.../cassandra/mail/CassandraMessageDAO.java | 8 --------
.../cassandra/mail/CassandraMessageMapper.java | 15 +++++---------
.../cassandra/mail/CassandraMessageDAOTest.java | 24 ++++++++++------------
.../java/org/apache/james/util/streams/Limit.java | 8 ++++++++
4 files changed, 24 insertions(+), 31 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index fc92869..58e25e3 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -74,7 +74,6 @@ import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.mail.model.Property;
import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
-import org.apache.james.util.streams.Limit;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
@@ -93,7 +92,6 @@ import com.google.common.primitives.Bytes;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
public class CassandraMessageDAO {
@@ -241,12 +239,6 @@ public class CassandraMessageDAO {
.flatMap(resultSet -> message(resultSet, id, fetchType));
}
- public Flux<MessageRepresentation> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
- return Flux.fromStream(limit.applyOnStream(messageIds.stream().distinct()))
- .publishOn(Schedulers.elastic())
- .flatMap(id -> retrieveMessage(id, fetchType), configuration.getMessageReadChunkSize());
- }
-
private Mono<ResultSet> retrieveRow(ComposedMessageIdWithMetaData messageId, FetchType fetchType) {
CassandraMessageId cassandraMessageId = (CassandraMessageId) messageId.getComposedMessageId().getMessageId();
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index b7e6ed3..919da37 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -161,22 +161,17 @@ public class CassandraMessageMapper implements MessageMapper {
@Override
public Iterator<MailboxMessage> findInMailbox(Mailbox mailbox, MessageRange messageRange, FetchType ftype, int max) {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
- return retrieveMessageIds(mailboxId, messageRange)
- .flatMap(ids -> retrieveMessages(ids, ftype, Limit.from(max)))
+ return Limit.from(max).applyOnFlux(
+ messageIdDAO.retrieveMessages(mailboxId, messageRange)
+ .flatMap(id -> retrieveMessage(id, ftype), cassandraConfiguration.getMessageReadChunkSize()))
.map(MailboxMessage.class::cast)
.sort(Comparator.comparing(MailboxMessage::getUid))
.toIterable()
.iterator();
}
- private Flux<List<ComposedMessageIdWithMetaData>> retrieveMessageIds(CassandraId mailboxId, MessageRange messageRange) {
- return messageIdDAO.retrieveMessages(mailboxId, messageRange)
- .window(cassandraConfiguration.getMessageReadChunkSize())
- .flatMap(flux -> flux.collect(Guavate.toImmutableList()));
- }
-
- private Flux<MailboxMessage> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
- return messageDAO.retrieveMessages(messageIds, fetchType, limit)
+ private Mono<MailboxMessage> retrieveMessage(ComposedMessageIdWithMetaData messageId, FetchType fetchType) {
+ return messageDAO.retrieveMessage(messageId, fetchType)
.flatMap(messageRepresentation -> attachmentLoader.addAttachmentToMessage(messageRepresentation, fetchType));
}
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
index daf4767..d7fe3da 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
@@ -53,7 +53,6 @@ import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.store.mail.MessageMapper;
import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
-import org.apache.james.util.streams.Limit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -63,7 +62,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.primitives.Bytes;
import nl.jqno.equalsverifier.EqualsVerifier;
-import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
class CassandraMessageDAOTest {
private static final int BODY_START = 16;
@@ -86,7 +85,7 @@ class CassandraMessageDAOTest {
private SimpleMailboxMessage message;
private CassandraMessageId messageId;
- private List<ComposedMessageIdWithMetaData> messageIds;
+ private ComposedMessageIdWithMetaData messageIdWithMetadata;
@BeforeEach
void setUp(CassandraCluster cassandra) {
@@ -97,11 +96,11 @@ class CassandraMessageDAOTest {
testee = new CassandraMessageDAO(cassandra.getConf(), cassandra.getTypesProvider(), blobStore, blobIdFactory,
new CassandraMessageId.Factory());
- messageIds = ImmutableList.of(ComposedMessageIdWithMetaData.builder()
+ messageIdWithMetadata = ComposedMessageIdWithMetaData.builder()
.composedMessageId(new ComposedMessageId(MAILBOX_ID, messageId, messageUid))
.flags(new Flags())
.modSeq(ModSeq.of(1))
- .build());
+ .build();
}
@Test
@@ -111,7 +110,7 @@ class CassandraMessageDAOTest {
testee.save(message).block();
MessageRepresentation attachmentRepresentation =
- toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Metadata, Limit.unlimited()));
+ toMessage(testee.retrieveMessage(messageIdWithMetadata, MessageMapper.FetchType.Metadata));
assertThat(attachmentRepresentation.getPropertyBuilder().getTextualLineCount())
.isEqualTo(0L);
@@ -127,7 +126,7 @@ class CassandraMessageDAOTest {
testee.save(message).block();
MessageRepresentation attachmentRepresentation =
- toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Metadata, Limit.unlimited()));
+ toMessage(testee.retrieveMessage(messageIdWithMetadata, MessageMapper.FetchType.Metadata));
assertThat(attachmentRepresentation.getPropertyBuilder().getTextualLineCount()).isEqualTo(textualLineCount);
}
@@ -139,7 +138,7 @@ class CassandraMessageDAOTest {
testee.save(message).block();
MessageRepresentation attachmentRepresentation =
- toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Full, Limit.unlimited()));
+ toMessage(testee.retrieveMessage(messageIdWithMetadata, MessageMapper.FetchType.Full));
assertThat(IOUtils.toString(attachmentRepresentation.getContent(), StandardCharsets.UTF_8))
.isEqualTo(CONTENT);
@@ -152,7 +151,7 @@ class CassandraMessageDAOTest {
testee.save(message).block();
MessageRepresentation attachmentRepresentation =
- toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Body, Limit.unlimited()));
+ toMessage(testee.retrieveMessage(messageIdWithMetadata, MessageMapper.FetchType.Body));
byte[] expected = Bytes.concat(
new byte[BODY_START],
@@ -168,7 +167,7 @@ class CassandraMessageDAOTest {
testee.save(message).block();
MessageRepresentation attachmentRepresentation =
- toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Headers, Limit.unlimited()));
+ toMessage(testee.retrieveMessage(messageIdWithMetadata, MessageMapper.FetchType.Headers));
assertThat(IOUtils.toString(attachmentRepresentation.getContent(), StandardCharsets.UTF_8))
.isEqualTo(CONTENT.substring(0, BODY_START));
@@ -189,9 +188,8 @@ class CassandraMessageDAOTest {
.build();
}
- private MessageRepresentation toMessage(Flux<MessageRepresentation> read) {
- return read.toStream()
- .findAny()
+ private MessageRepresentation toMessage(Mono<MessageRepresentation> read) {
+ return read.blockOptional()
.orElseThrow(() -> new IllegalStateException("Collection is not supposed to be empty"));
}
diff --git a/server/container/util/src/main/java/org/apache/james/util/streams/Limit.java b/server/container/util/src/main/java/org/apache/james/util/streams/Limit.java
index 268ed5e..466f1fb 100644
--- a/server/container/util/src/main/java/org/apache/james/util/streams/Limit.java
+++ b/server/container/util/src/main/java/org/apache/james/util/streams/Limit.java
@@ -25,6 +25,8 @@ import java.util.stream.Stream;
import com.google.common.base.Preconditions;
+import reactor.core.publisher.Flux;
+
public class Limit {
public static Limit from(int limit) {
@@ -65,6 +67,12 @@ public class Limit {
.orElse(stream);
}
+ public <T> Flux<T> applyOnFlux(Flux<T> flux) {
+ return limit
+ .map(flux::take)
+ .orElse(flux);
+ }
+
@Override
public final boolean equals(Object o) {
if (o instanceof Limit) {
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org